日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

限流与熔断

發(fā)布時(shí)間:2024/8/26 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 限流与熔断 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
  • 緣由
  • 主要功能
    • 簡易熔斷工具
    • reject反饋限流工具
    • 根據(jù)負(fù)載動(dòng)態(tài)限流
  • 用法
    • 引入 pom
    • 新增 component-scan
    • 構(gòu)建 DynamicLimiter
      • 策略
    • 事件
    • 監(jiān)控服務(wù)調(diào)用結(jié)果
      • 熔斷策略
      • reject 反饋限流策略
      • 根據(jù)負(fù)載動(dòng)態(tài)限流
    • 調(diào)用限流熔斷判斷邏輯
    • QConfig 配置?
      • 全局開關(guān)
      • 滑動(dòng)窗口
      • 根據(jù)負(fù)載動(dòng)態(tài)限流策略
      • reject 反饋動(dòng)態(tài)限流策略
      • 熔斷策略
      • Load 監(jiān)控策略
      • 配置異常則報(bào)警
      • 匯總
    • 監(jiān)控
  • 應(yīng)用效果
    • reject 反饋限流策略

概述

?

為了保證系統(tǒng)不被突發(fā)流量擊垮,進(jìn)行容量保障是十分有必要的。從架構(gòu)的穩(wěn)定性角度看,在有限資源的情況下,所能提供的單位時(shí)間服務(wù)能力也是有限的。假如超過承受能力,可能會(huì)帶來整個(gè)服務(wù)的停頓,應(yīng)用的Crash,進(jìn)而可能將風(fēng)險(xiǎn)傳遞給服務(wù)調(diào)用方造成整個(gè)系統(tǒng)的服務(wù)能力喪失,進(jìn)而引發(fā)雪崩。

?

為了避免系統(tǒng)壓力大時(shí)引發(fā)服務(wù)雪崩,就需要在系統(tǒng)中引入限流,降級(jí)和熔斷等工具。

?

目的

?

最初在基礎(chǔ)數(shù)據(jù)內(nèi)部,我們使用了限流組件,然后調(diào)研了降級(jí)熔斷框架Hystrix,發(fā)現(xiàn)這是一個(gè)很好的框架,能夠提升依賴大量外部服務(wù)的系統(tǒng)的容錯(cuò)能力,但是不適合依賴比較簡單的基礎(chǔ)數(shù)據(jù)系統(tǒng)。

?

機(jī)票交易系統(tǒng)依賴諸多子系統(tǒng),比如保險(xiǎn),各種X產(chǎn)品等。故而,在此簡單介紹一下,推廣給交易和報(bào)價(jià)的同學(xué)使用。

?

限流

?

根據(jù)排隊(duì)理論,具有延遲的服務(wù)隨著請求量的不斷提升,其平均響應(yīng)時(shí)間也會(huì)迅速提升,為了保證服務(wù)的SLA,有必要控制單位時(shí)間的請求量。這就是限流為什么愈發(fā)重要的原因。

?

分類

?

qps限流

?

限制每秒處理請求數(shù)不超過閾值。

?

并發(fā)限流

?

限制同時(shí)處理的請求數(shù)目。Java 中的 Semaphore 是做并發(fā)限制的好工具,特別適用于資源有效的場景。

?

單機(jī)限流

?

Guava 中的 RateLimiter。

?

集群限流

?

TC 提供的 common-blocking 組件提供此功能。

?

算法

?

漏桶算法

?

漏桶算法思路很簡單,水(請求)先進(jìn)入到漏桶里,漏桶以一定的速度出水,當(dāng)水流入速度過大會(huì)直接溢出,可以看出漏桶算法能強(qiáng)行限制數(shù)據(jù)的傳輸速率。

?

?

旗艦店運(yùn)價(jià)庫抓取調(diào)度

?

在旗艦店運(yùn)價(jià)庫對(duì)航司報(bào)價(jià)的離線抓取中,由于航司接口有qps限制,需要限制訪問速率。所以我們借助redis隊(duì)列作為漏桶,實(shí)現(xiàn)了對(duì)航司接口訪問速率的控制。

?

?

令牌桶算法

?

對(duì)于很多應(yīng)用場景來說,除了要求能夠限制數(shù)據(jù)的平均傳輸速率外,還要求允許某種程度的突發(fā)傳輸。這時(shí)候漏桶算法可能就不合適了,令牌桶算法更為適合。
令牌桶算法的原理是系統(tǒng)會(huì)以一個(gè)恒定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個(gè)令牌,當(dāng)桶里沒有令牌可取時(shí),則拒絕服務(wù)。

?

?

在 Guava 的 RateLimiter 中,使用的就是令牌桶算法,允許部分突發(fā)流量傳輸。在其源碼里,可以看到能夠突發(fā)傳輸?shù)牧髁康扔?maxBurstSeconds * qps。

?

/** ?* This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling. ?* The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in ?* terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10 ?* seconds, we can save up to 2 * 10 = 20 permits. ?*/ static?final?class?SmoothBursty?extends?SmoothRateLimiter { ??/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */ ??final?double?maxBurstSeconds; ??SmoothBursty(SleepingStopwatch stopwatch,?double?maxBurstSeconds) { ????super(stopwatch); ????this.maxBurstSeconds = maxBurstSeconds; ??}

?

Guava 里除了允許突發(fā)傳輸?shù)?SmoothBursty, 還有于此相反的,可以緩慢啟動(dòng)的 SmoothWarmingUp。

?

滑動(dòng)窗口

?

  • TC 提供的 common-blocking 限流組件,背后采用滑動(dòng)窗口來計(jì)算流量是否超出限制。
  • 熔斷框架 Hystrix 也采用滑動(dòng)窗口來統(tǒng)計(jì)服務(wù)調(diào)用信息。
  • ?

    RxJava 實(shí)現(xiàn)滑動(dòng)窗口計(jì)數(shù)

    ?

    RxJava 是一個(gè)響應(yīng)式函數(shù)編程框架,以觀察者模式為骨架,能夠輕松實(shí)現(xiàn)滑動(dòng)窗口計(jì)數(shù)功能。具體示例代碼如下:

    ?

    package?com.rxjava.test; import?org.slf4j.Logger; import?org.slf4j.LoggerFactory; import?rx.Observable; import?rx.functions.Func1; import?rx.functions.Func2; import?rx.subjects.PublishSubject; import?rx.subjects.SerializedSubject; import?java.util.concurrent.TimeUnit; /** ?* 模擬滑動(dòng)窗口計(jì)數(shù) ?* Created by albon on 17/6/24. ?*/ public?class?RollingWindowTest { ????private?static?final?Logger logger = LoggerFactory.getLogger(WindowTest.class); ????public?static?final?Func2<Integer, Integer, Integer> INTEGER_SUM = ????????????(integer, integer2) -> integer + integer2; ????public?static?final?Func1<Observable<Integer>, Observable<Integer>> WINDOW_SUM = ????????????window -> window.scan(0, INTEGER_SUM).skip(3); ????public?static?final?Func1<Observable<Integer>, Observable<Integer>> INNER_BUCKET_SUM = ????????????integerObservable -> integerObservable.reduce(0, INTEGER_SUM); ????public?static?void?main(String[] args)?throws?InterruptedException { ????????PublishSubject<Integer> publishSubject = PublishSubject.create(); ????????SerializedSubject<Integer, Integer> serializedSubject = publishSubject.toSerialized(); ????????serializedSubject ????????????????.window(5, TimeUnit.SECONDS)?// 5秒作為一個(gè)基本塊 ????????????????.flatMap(INNER_BUCKET_SUM)???????????// 基本塊內(nèi)數(shù)據(jù)求和 ????????????????.window(3,?1)??????????????// 3個(gè)塊作為一個(gè)窗口,滾動(dòng)布數(shù)為1 ????????????????.flatMap(WINDOW_SUM)?????????????????// 窗口數(shù)據(jù)求和 ????????????????.subscribe((Integer integer) -> ????????????????????????logger.info("[{}] call ...... {}", ????????????????????????Thread.currentThread().getName(), integer)); ????????// 緩慢發(fā)送數(shù)據(jù),觀察效果 ????????for?(int?i=0; i<100; ++i) { ????????????if?(i <?30) { ????????????????serializedSubject.onNext(1); ????????????}?else?{ ????????????????serializedSubject.onNext(2); ????????????} ????????????Thread.sleep(1000); ????????} ????} }

    ?

    降級(jí)和熔斷

    ?

    降級(jí)

    ?

    業(yè)務(wù)降級(jí),是指犧牲非核心的業(yè)務(wù)功能,保證核心功能的穩(wěn)定運(yùn)行。簡單來說,要實(shí)現(xiàn)優(yōu)雅的業(yè)務(wù)降級(jí),需要將功能實(shí)現(xiàn)拆分到相對(duì)獨(dú)立的不同代碼單元,分優(yōu)先級(jí)進(jìn)行隔離。在后臺(tái)通過開關(guān)控制,降級(jí)部分非主流程的業(yè)務(wù)功能,減輕系統(tǒng)依賴和性能損耗,從而提升集群的整體吞吐率。

    ?

    降級(jí)的重點(diǎn)是:業(yè)務(wù)之間有優(yōu)先級(jí)之分。

    ?

    降級(jí)的典型應(yīng)用是:電商活動(dòng)期間。

    ?

    熔斷

    ?

    老式電閘都安裝了保險(xiǎn)絲,一旦有人使用超大功率的設(shè)備,保險(xiǎn)絲就會(huì)燒斷以保護(hù)各個(gè)電器不被強(qiáng)電流給燒壞。同理我們的接口也需要安裝上“保險(xiǎn)絲”,以防止非預(yù)期的請求對(duì)系統(tǒng)壓力過大而引起的系統(tǒng)癱瘓,當(dāng)流量過大時(shí),可以采取拒絕或者引流等機(jī)制。

    ?

    同樣在分布式系統(tǒng)中,當(dāng)被調(diào)用的遠(yuǎn)程服務(wù)無法使用時(shí),如果沒有過載保護(hù),就會(huì)導(dǎo)致請求的資源阻塞在遠(yuǎn)程服務(wù)器上耗盡資源。很多時(shí)候,剛開始可能只是出現(xiàn)了局部小規(guī)模的故障,然而由于種種原因,故障影響范圍越來越大,最終導(dǎo)致全局性的后果。這種過載保護(hù),就是熔斷器。

    ?

    熔斷器的設(shè)計(jì)思路

    ?

    ?

  • Closed:初始狀態(tài),熔斷器關(guān)閉,正常提供服務(wù)
  • Open: 失敗次數(shù),失敗百分比達(dá)到一定的閾值之后,熔斷器打開,停止訪問服務(wù)
  • Half-Open:熔斷一定時(shí)間之后,小流量嘗試調(diào)用服務(wù),如果成功則恢復(fù),熔斷器變?yōu)镃losed狀態(tài)
  • ?

    降級(jí)熔斷相似點(diǎn)

    ?

  • 目的一致,都是從可用性可靠性著想,為防止系統(tǒng)的整體緩慢甚至崩潰,采用的技術(shù)手段
  • 最終表現(xiàn)類似,對(duì)于兩者來說,最終讓用戶體驗(yàn)到的是某些功能暫時(shí)不可達(dá)或不可用
  • 粒度一般都是服務(wù)級(jí)別
  • 自治性要求很高,熔斷模式一般都是服務(wù)基于策略的自動(dòng)觸發(fā),降級(jí)雖說可人工干預(yù),但在微服務(wù)架構(gòu)下,完全靠人顯然不可能,開關(guān)預(yù)置、配置中心都是必要手段
  • ?

    降級(jí)熔斷區(qū)別

    ?

  • 觸發(fā)原因不一樣,服務(wù)熔斷一般是某個(gè)服務(wù)(下游服務(wù))故障引起,而服務(wù)降級(jí)一般是從整體負(fù)荷考慮
  • 自愈能力要求不一樣,服務(wù)熔斷在發(fā)生后有自愈能力,而服務(wù)降級(jí)沒有該職責(zé)
  • ?

    相關(guān)框架組件

    ?

    common-blocking 限流組件

    ?

    http://wiki.corp.qunar.com/display/devwiki/common-blocking

    ?

    優(yōu)點(diǎn)

    ?

  • qconfig 控制,靈活方便
  • 支持同步/異步模式,異步模式不增加接口響應(yīng)時(shí)間
  • 支持HTTP, Dubbo接口
  • 集群級(jí)別的限流
  • 支持自定義更細(xì)的限流粒度
  • ?

    dubbo 熔斷

    ?

    http://wiki.corp.qunar.com/pages/viewpage.action?pageId=105912517

    ?

    ?

    hystrix 降級(jí)熔斷

    ?

    適用對(duì)象

    ?

    依賴大量外部服務(wù)的業(yè)務(wù)

    ?

    ?

    主要功能

    ?

  • 服務(wù)故障時(shí),觸發(fā)熔斷,快速失敗,而非排隊(duì)處理,占用系統(tǒng)資源
  • 支持線程池隔離,避免個(gè)別依賴服務(wù)耗光線程而影響其他服務(wù)
  • 提供兜底策略,以盡量減少失敗
  • 通過限制線程池?cái)?shù)目,或者信號(hào)量最大并發(fā)數(shù),可以達(dá)到限流的目的
  • 支持對(duì)結(jié)果進(jìn)行Cache
  • ?


    圖中流程的說明:

    ?

  • 將遠(yuǎn)程服務(wù)調(diào)用邏輯封裝進(jìn)一個(gè)HystrixCommand。
  • 對(duì)于每次服務(wù)調(diào)用可以使用同步或異步機(jī)制,對(duì)應(yīng)執(zhí)行execute()或queue()。
  • 判斷熔斷器(circuit-breaker)是否打開或者半打開狀態(tài),如果打開跳到步驟8,進(jìn)行回退策略,如果關(guān)閉進(jìn)入步驟4。
  • 判斷線程池/隊(duì)列/信號(hào)量(使用了艙壁隔離模式)是否跑滿,如果跑滿進(jìn)入回退步驟8,否則繼續(xù)后續(xù)步驟5。
  • run方法中執(zhí)行了實(shí)際的服務(wù)調(diào)用。
  • 服務(wù)調(diào)用發(fā)生超時(shí)時(shí),進(jìn)入步驟8。
  • 判斷run方法中的代碼是否執(zhí)行成功。
  • 執(zhí)行成功返回結(jié)果。
  • 執(zhí)行中出現(xiàn)錯(cuò)誤則進(jìn)入步驟8。
  • 所有的運(yùn)行狀態(tài)(成功,失敗,拒絕,超時(shí))上報(bào)給熔斷器,用于統(tǒng)計(jì)從而影響熔斷器狀態(tài)。
  • 進(jìn)入getFallback()回退邏輯。
  • 沒有實(shí)現(xiàn)getFallback()回退邏輯的調(diào)用將直接拋出異常。
  • 回退邏輯調(diào)用成功直接返回。
  • 回退邏輯調(diào)用失敗拋出異常。
  • 返回執(zhí)行成功結(jié)果。
  • ?

    命令模式

    ?

    hystrix 使用命令模式對(duì)服務(wù)調(diào)用進(jìn)行封裝,要想使用 hystrix,只需要繼承 HystrixCommand 即可。

    ?

    public?class?CommandHelloFailure?extends?HystrixCommand<String> { ????public?CommandHelloFailure() { ???????super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.GROUP.KEY)) ????????????????.andCommandKey(HystrixCommandKey.Factory.asKey(HystrixCommandEnum.AV_QUERY.KEY)) ????????????????.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("AvPool")) ????????????????.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter() ????????????????????????.withCoreSize(2)//ThreadPoolExecutor maximumPoolSize=maxnumsize=10 ????????????????????????.withMaxQueueSize(-1)//線程打滿后,直接快速拒絕。 ????????????????) ????????????????.andCommandPropertiesDefaults(HystrixCommandProperties.Setter() ????????????????????????.withExecutionTimeoutInMilliseconds(10000)//run方法執(zhí)行時(shí)間,超過一秒就會(huì)被cancel. ????????????????????????.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD) ????????????????????????.withCircuitBreakerRequestVolumeThreshold(30) ????????????????????????.withCircuitBreakerErrorThresholdPercentage(50)//錯(cuò)誤率達(dá)到50%后,開啟熔斷. ????????????????????????.withCircuitBreakerSleepWindowInMilliseconds(5000)//熔斷后休眠5000毫秒后發(fā)起重試. ????????????????));????????this.name = name; ????} ????@Override ????protected?String run() { ????????throw?new?RuntimeException("this command always fails"); ????} ????@Override ????protected?String getFallback() { ????????return?"Hello Failure "?+ name +?"!"; ????} }

    ?

    HystrixCommand有4個(gè)方法可供調(diào)用

    ?

    方法名

    返回值

    含義

    execute

    run方法的返回值

    同步阻塞執(zhí)行

    queue

    Future

    異步非阻塞執(zhí)行

    observe

    Observable<?>

    事件注冊前執(zhí)行

    toObservable

    Observable<?>

    事件注冊后執(zhí)行

    ?

    熔斷判斷邏輯關(guān)鍵源代碼

    ?

    類名

    說明

    HystrixCircuitBreaker

    熔斷判斷類

    HealthCountsStream

    Command 執(zhí)行結(jié)果統(tǒng)計(jì)類,主要提供了 HealthCounts 對(duì)象作為統(tǒng)計(jì)結(jié)果,用于 HystrixCircuitBreaker?中進(jìn)行熔斷判斷

    BucketedRollingCounterStream?
    BucketedCounterStream

    滑動(dòng)窗口計(jì)數(shù)類

    HystrixCommandCompletionStream

    Command 執(zhí)行結(jié)果數(shù)據(jù)發(fā)布類

    ?

    無線同學(xué)封裝的 qhystrix

    ?

    無線平臺(tái)的同學(xué),對(duì)hystrix進(jìn)行了適當(dāng)?shù)姆庋b,能夠和公司的公共組件進(jìn)行配合使用,十分方便。

    ?

  • 引入了 QConfig?動(dòng)態(tài)配置,方便線上靈活修改服務(wù)配置,更多配置相關(guān)資料可以查看官方文檔?。

    scheduledMonitorOpen=true #av_pool線程池核心線程數(shù)目 hystrix.threadpool.av_pool.coreSize=10 #av_pool線程池隊(duì)列大小 hystrix.threadpool.av_pool.queueSizeRejectionThreshold=2 #執(zhí)行策略,以信號(hào)量/線程池的方式執(zhí)行 hystrix.command.av_query.execution.isolation.strategy=THREAD #超時(shí)是否啟用 hystrix.command.av_query.execution.timeout.enabled=true #當(dāng)超時(shí)的時(shí)候是否中斷(interrupt) HystrixCommand.run()執(zhí)行 hystrix.command.av_query.execution.isolation.thread.interruptOnTimeout=true #超時(shí)時(shí)間,支持線程池/信號(hào)量 hystrix.command.av_query.execution.isolation.thread.timeoutInMilliseconds=10000 #統(tǒng)計(jì)執(zhí)行數(shù)據(jù)的時(shí)間窗口,單位毫秒【此配置修改后需要重啟才能生效】 hystrix.command.av_query.metrics.rollingStats.timeInMilliseconds=10000 #觸發(fā)熔斷器開啟的窗口時(shí)間需要達(dá)到的最小請求數(shù) hystrix.command.av_query.circuitBreaker.requestVolumeThreshold=10 #熔斷器保持開啟狀態(tài)時(shí)間。默認(rèn)為5000毫秒,則熔斷器中斷請求5秒后會(huì)進(jìn)入半打開狀態(tài),放部分流量過去重試 hystrix.command.av_query.circuitBreaker.sleepWindowInMilliseconds=10000 #窗口時(shí)間內(nèi)的出錯(cuò)率,默認(rèn)為50,則達(dá)到50%出錯(cuò)率時(shí),熔斷開啟 hystrix.command.av_query.circuitBreaker.errorThresholdPercentage=50
  • 引入了 Watcher 監(jiān)控,方便觀察服務(wù)運(yùn)行情況,以及添加報(bào)警。

    hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Count=6 hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Time=0 hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Count=64 JVM_Thread_Count=147 hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Time=0 hystrix.CommandKey_av_query.cluster.RT_FAILURE_Count=25 JVM_PS_MarkSweep_Count=0 Fare_Rule_Change_Task_Queue_Size_Value=0 hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Time=0 hystrix.CommandKey_av_query.cluster.RT_SHORT_CIRCUITED_Count=39 hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Count=25 hystrix.CommandKey_av_query.cluster.RT_SUCCESS_Count=6 hystrix.CommandKey_av_query.cluster.RT_FALLBACK_SUCCESS_Time=0 hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Time=0 Av_Change_Task_Queue_Size_Value=0 hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FALLBACK_SUCCESS_Count=64 hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_FAILURE_Time=0 hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SUCCESS_Time=0 no_y_email_task_Time=0 no_y_queue_empty_Count=0 hystrix.CommandKey_av_query.cluster.RT_FAILURE_Time=0 JVM_PS_Scavenge_Count=0 no_y_email_task_Count=0 hystrix.CommandKey_av_query.l-flightdata3-f-dev-cn0.RT_SHORT_CIRCUITED_Count=39 no_y_queue_empty_Time=0
  • ?

    注解驅(qū)動(dòng)開發(fā)

    ?

    注解是 Java 語言的一大優(yōu)勢,很多開發(fā)框架比如 Spring, Hibernate 都將這一優(yōu)勢使用到了極致。所以呢,官方?也提供了基于注解進(jìn)行開發(fā)的 hystrix-javanica?包,能夠減少大量繁瑣代碼的編寫。

    ?

    <dependency> ????<groupId>com.netflix.hystrix</groupId> ????<artifactId>hystrix-javanica</artifactId> ????<version>1.5.11</version> </dependency>

    ?

    <aop:aspectj-autoproxy/> <bean id="hystrixAspect"?class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"></bean>

    ?

    在POM引入jar包,并且配置spring aop即可。示例代碼如下:

    ?

    package?com.qunar.flight.farecore.storage.service.av; import?com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import?com.netflix.hystrix.strategy.HystrixPlugins; import?com.qunar.mobile.hystrix.config.HystrixConfig; import?com.qunar.mobile.hystrix.config.QconfigUtils; import?com.qunar.mobile.hystrix.monitor.HystrixScheduledMonitor; import?com.qunar.mobile.hystrix.monitor.QunarHystrixCommandExecutionHook; import?org.slf4j.Logger; import?org.slf4j.LoggerFactory; import?org.springframework.stereotype.Service; import?qunar.tc.qconfig.client.Configuration; import?javax.annotation.PostConstruct; import?java.util.Map; /** ?* @author albon ?*???????? Date: 17-6-30 ?*???????? Time: 上午10:44 ?*/ @Service public?class?TestHystrixService { ????private?static?final?Logger logger = LoggerFactory.getLogger(TestHystrixService.class); ????@PostConstruct ????public?void?init() { ????????logger.info("init execution hook"); ????????HystrixPlugins.getInstance().registerCommandExecutionHook(new?QunarHystrixCommandExecutionHook()); ????????QconfigUtils.getInstance().getHystrixMapConfig() ????????????????.addListener(new?Configuration.ConfigListener<Map<String, String>>() { ????????????????????@Override ????????????????????public?void?onLoad(Map<String, String> conf) { ????????????????????????boolean?isOpen = Boolean.parseBoolean(conf.get("scheduledMonitorOpen")); ????????????????????????HystrixScheduledMonitor.getInstance().setOpen(isOpen); ????????????????????????// qconfig熱發(fā)修改配置 ????????????????????????HystrixConfig.getInstance().setConfig(conf); ????????????????????} ????????????????}); ????} ????@HystrixCommand(commandKey =?"av_query", groupKey =?"av_group", threadPoolKey =?"av_pool", fallbackMethod =?"queryFallback") ????public?String query() { ????????return?String.format("[%s] RUN SUCCESS", Thread.currentThread().getName()); ????} ????public?String queryFallback() { ????????return?String.format("[%s] FALLBACK", Thread.currentThread().getName()); ????} }

    ?

    注意:因?yàn)樽⒔夥绞讲辉倮^承quanr-hystrix里的AbstractHystrixCommand,故這里特意在 PostContruct 方法里,注冊了 Hook 類,以及對(duì) QConfig 配置進(jìn)行監(jiān)聽。

    ?

    在使用時(shí),曾經(jīng)遇到一個(gè)小問題,異常信息如下:

    ?

    Caused by: org.aspectj.apache.bcel.classfile.ClassFormatException: Invalid?byte?tag in constant pool:?18 ????at org.aspectj.apache.bcel.classfile.ClassParser.readConstantPool(ClassParser.java:192) ~[aspectjweaver-1.6.10.jar:1.6.10] ????at org.aspectj.apache.bcel.classfile.ClassParser.parse(ClassParser.java:131) ~[aspectjweaver-1.6.10.jar:1.6.10] ????at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadJavaClass(NonCachingClassLoaderRepository.java:262) ~[aspectjweaver-1.6.10.jar:1.6.10] ????at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java:242) ~[aspectjweaver-1.6.10.jar:1.6.10] ????at org.aspectj.apache.bcel.util.NonCachingClassLoaderRepository.loadClass(NonCachingClassLoaderRepository.java:249) ~[aspectjweaver-1.6.10.jar:1.6.10] ????at org.aspectj.weaver.reflect.Java15AnnotationFinder.getAnnotations(Java15AnnotationFinder.java:202) ~[aspectjweaver-1.6.10.jar:1.6.10] ????at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.unpackAnnotations(ReflectionBasedResolvedMemberImpl.java:211) ~[aspectjweaver-1.6.10.jar:1.6.10] ????at org.aspectj.weaver.reflect.ReflectionBasedResolvedMemberImpl.hasAnnotation(ReflectionBasedResolvedMemberImpl.java:163) ~[aspectjweaver-1.6.10.jar:1.6.10] ????at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java:109) ~[aspectjweaver-1.6.10.jar:1.6.10] ????at org.aspectj.weaver.patterns.ExactAnnotationTypePattern.matches(ExactAnnotationTypePattern.java:96) ~[aspectjweaver-1.6.10.jar:1.6.10]

    ?

    通過升級(jí)aspectjweaver包到最新版解決

    ?

    <dependency> ????<groupId>org.aspectj</groupId> ????<artifactId>aspectjweaver</artifactId> ????<version>1.8.10</version> </dependency>

    ?

    更詳細(xì)的資料可以查看官方文檔。

    ?

    資料

    ?

    qunar-hystrixhttp://gitlab.corp.qunar.com/mobile_public/qunar-hystrix/wikis/start
    官方文檔https://github.com/Netflix/Hystrix/wiki
    javanica文檔?https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-javanica

    使用中遇到的坑

    commons-configuration 包版本低引起的 NPE 異常

    報(bào)價(jià)同學(xué)在其系統(tǒng)中引入時(shí),遇到了很奇怪的 NPE 異常

    java.lang.NullPointerException ????????at com.netflix.config.ConcurrentMapConfiguration.clearConfigurationListeners(ConcurrentMapConfiguration.java:330) ????????at org.apache.commons.configuration.event.EventSource.<init>(EventSource.java:76) ????????at org.apache.commons.configuration.AbstractConfiguration.<init>(AbstractConfiguration.java:63) ????????at com.netflix.config.ConcurrentMapConfiguration.<init>(ConcurrentMapConfiguration.java:68) ????????at com.netflix.config.ConcurrentCompositeConfiguration.<init>(ConcurrentCompositeConfiguration.java:172) ????????at com.netflix.config.ConfigurationManager.getConfigInstance(ConfigurationManager.java:125) ????????at com.netflix.config.DynamicPropertyFactory.getInstance(DynamicPropertyFactory.java:263) ????????at com.netflix.config.DynamicProperty.getInstance(DynamicProperty.java:245) ????????at com.netflix.config.PropertyWrapper.<init>(PropertyWrapper.java:58) ????????at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$ArchaiusDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java:62) ????????at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius$StringDynamicProperty.<init>(HystrixDynamicPropertiesArchaius.java:73) ????????at com.netflix.hystrix.strategy.properties.archaius.HystrixDynamicPropertiesArchaius.getString(HystrixDynamicPropertiesArchaius.java:34) ????????at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementationViaProperties(HystrixPlugins.java:344) ????????at com.netflix.hystrix.strategy.HystrixPlugins.getPluginImplementation(HystrixPlugins.java:334) ????????at com.netflix.hystrix.strategy.HystrixPlugins.getPropertiesStrategy(HystrixPlugins.java:243) ????????at com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory.getCommandProperties(HystrixPropertiesFactory.java:62) ????????at com.netflix.hystrix.AbstractCommand.initCommandProperties(AbstractCommand.java:204) ????????at com.netflix.hystrix.AbstractCommand.<init>(AbstractCommand.java:163) ????????at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java:147) ????????at com.netflix.hystrix.HystrixCommand.<init>(HystrixCommand.java:133)

    ConcurrentMapConfiguration.clearConfigurationListeners 方法內(nèi)容如下:

    public?class?ConcurrentMapConfiguration?extends?AbstractConfiguration { ????protected?ConcurrentHashMap<String,Object> map; ????private?Collection<ConfigurationListener> listeners =?new?CopyOnWriteArrayList<ConfigurationListener>();? ????@Override ????public?void?clearConfigurationListeners() { ????????listeners.clear();?// NPE異常就在這一行 ????} }

    初看代碼,listeners 變量是有初始化語句的,為啥還會(huì)是 null 呢?我們繼續(xù)往上看?ConcurrentMapConfiguration 的父類,clearConfigurationListeners 方法覆蓋的是父類 EventSource 的同名方法。EventSource 代碼如下:

    public?class?EventSource { ????/** A collection for the registered event listeners. */ ????private?Collection listeners; ????/** A counter for the detail events. */ ????private?int?detailEvents; ????/** ?????* Creates a new instance of <code>EventSource</code>. ?????*/ ????public?EventSource() { ????????clearConfigurationListeners(); ????} ????/** ?????* Removes all registered configuration listeners. ?????*/ ????public?void?clearConfigurationListeners() { ????????listeners =?new?LinkedList(); ????}

    ?clearConfigurationListeners 方法是在父類的構(gòu)造函數(shù)中調(diào)用的,此時(shí)子類還沒有做初始化操作,listeners 也還沒有賦值,這就奇葩了。仔細(xì)觀察后發(fā)現(xiàn),EventSource 是?commons-configuration 包里的,而?ConcurrentMapConfiguration 是 archaius-core 包(Netflix 開源的基于java的配置管理類庫)的,archaius 中引用的?commons-configuration 是較高的版本 1.8。高版本的 EventSource 代碼有所不同,其構(gòu)造函數(shù)里沒有再調(diào)用?clearConfigurationListeners,而是調(diào)用?initListeners 方法。升級(jí) pom 中的?commons-configuration 版本后,系統(tǒng)運(yùn)行正常啦

    public?class?EventSource { ????/** A collection for the registered event listeners. */ ????private?Collection<ConfigurationListener> listeners; ????/** ?????* Creates a new instance of {@code EventSource}. ?????*/ ????public?EventSource() { ????????initListeners(); ????} ????/** ?????* Initializes the collections for storing registered event listeners. ?????*/ ????private?void?initListeners() { ????????listeners =?new?CopyOnWriteArrayList<ConfigurationListener>(); ????????errorListeners =?new?CopyOnWriteArrayList<ConfigurationErrorListener>(); ????}

    dynamic-limiter 組件

    設(shè)計(jì)目的

    使用 Hystrix 編程必須使用其命令模式,繼承 HystrixCommand 或 HystrixObservableCommand。對(duì)于程序中原先采用異步回調(diào)模式的代碼,使用 Hystrix 必須改造成同步模式,涉及到系統(tǒng)代碼的大量改造,并且隔離在單獨(dú)的線程池中,導(dǎo)致線程切換增加,影響性能。

    為此,我們在深入了解 Hystrix 原理之后,決定自己動(dòng)手寫一個(gè)降級(jí)熔斷工具 dynamic-limiter,提供以下兩種功能:

  • ?簡化給異步回調(diào)模式的代碼增加降級(jí)熔斷的功能的步驟。主要做法是,不再采用 Hystrix 僵硬的命令模式,分離服務(wù)調(diào)用結(jié)果的監(jiān)控和熔斷判斷邏輯。
  • ?針對(duì)接收消息->獲取資源→進(jìn)入計(jì)算隊(duì)列的編程模式,設(shè)計(jì)動(dòng)態(tài)限流組件,邏輯如下圖所示:
  • ? ? 3. 監(jiān)控系統(tǒng)負(fù)載,動(dòng)態(tài)設(shè)置限流閾值,原理如下:

    轉(zhuǎn)載于:https://www.cnblogs.com/DengGao/p/rateLimit.html

    總結(jié)

    以上是生活随笔為你收集整理的限流与熔断的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。