日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hystrix中的批量(折叠)请求

發布時間:2023/12/3 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hystrix中的批量(折叠)请求 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Hystrix具有折疊(或批處理)請求的高級功能。 如果兩個或多個命令同時運行相似的請求,Hystrix可以將它們組合在一起,運行一個批處理請求,并將拆分結果分派回所有命令。 首先讓我們看看Hystrix如何工作而不會崩潰。 假設我們有一個StockPrice給定Ticker StockPrice的服務:

import lombok.Value; import java.math.BigDecimal; import java.time.Instant;@Value class Ticker {String symbol; }@Value class StockPrice {BigDecimal price;Instant effectiveTime; }interface StockPriceGateway {default StockPrice load(Ticker stock) {final Set<Ticker> oneTicker = Collections.singleton(stock);return loadAll(oneTicker).get(stock);}ImmutableMap<Ticker, StockPrice> loadAll(Set<Ticker> tickers); }

為了方便起見, StockPriceGateway核心實現必須提供loadAll()批處理方法,而實現load()方法則是為了方便。 因此,我們的網關能夠批量加載多個價格(例如,以減少延遲或網絡協議開銷),但是目前我們不使用此功能,始終一次加載一個股票的價格:

class StockPriceCommand extends HystrixCommand<StockPrice> {private final StockPriceGateway gateway;private final Ticker stock;StockPriceCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stock = stock;}@Overrideprotected StockPrice run() throws Exception {return gateway.load(stock);} }

這樣的命令將始終為每個Ticker調用StockPriceGateway.load() ,如以下測試所示:

class StockPriceCommandTest extends Specification {def gateway = Mock(StockPriceGateway)def 'should fetch price from external service'() {given:gateway.load(TickerExamples.any()) >> StockPriceExamples.any()def command = new StockPriceCommand(gateway, TickerExamples.any())when:def price = command.execute()then:price == StockPriceExamples.any()}def 'should call gateway exactly once when running Hystrix command'() {given:def command = new StockPriceCommand(gateway, TickerExamples.any())when:command.execute()then:1 * gateway.load(TickerExamples.any())}def 'should call gateway twice when command executed two times'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:commandOne.execute()commandTwo.execute()then:2 * gateway.load(TickerExamples.any())}def 'should call gateway twice even when executed in parallel'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:2 * gateway.load(TickerExamples.any())}}

如果您不了解Hystrix,則通過將外部調用包裝在命令中可以獲得許多功能,例如超時,斷路器等。但這不是本文的重點。 看一下最后兩個測試:當兩次,順序或并行( queue() )兩次詢問任意行情的價格時,我們的外部gateway也被兩次調用。 上一次測試特別有趣–我們幾乎同時要求相同的報價,但Hystrix不能弄清楚。 這兩個命令是完全獨立的,將在不同的線程中執行,彼此之間一無所知-即使它們幾乎同時運行。

折疊就是找到類似的請求并將其組合。 批處理(我將這個術語與崩潰互換使用)不會自動發生,并且需要一些編碼。 但是首先讓我們看看它的行為:

def 'should collapse two commands executed concurrently for the same stock ticker'() {given:def anyTicker = TickerExamples.any()def tickers = [anyTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, anyTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:0 * gateway.load(_)1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any()) }def 'should collapse two commands executed concurrently for the different stock tickers'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other()) }def 'should correctly map collapsed response into individual requests'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setgateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other())and:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:def anyPrice = futureOne.get()def otherPrice = futureTwo.get()then:anyPrice == StockPriceExamples.any()otherPrice == StockPriceExamples.other() }

第一個測試證明,不是兩次調用load() ,而是幾乎一次調用loadAll() 。 還要注意,由于我們要求相同的Ticker (來自兩個不同的線程),因此loadAll()僅請求一個代碼。 第二個測試顯示兩個并發請求,兩個不同的行情收錄器被折疊為一個批處理調用。 第三次測試可確保我們仍能對每個單獨的請求得到正確的響應。 相反,延長HystrixCommand我們必須擴展更加復雜HystrixCollapser 。 現在是時候看到StockTickerPriceCollapsedCommand實現,它無縫替換了StockPriceCommand :

class StockTickerPriceCollapsedCommand extends HystrixCollapser<ImmutableMap<Ticker, StockPrice>, StockPrice, Ticker> {private final StockPriceGateway gateway;private final Ticker stock;StockTickerPriceCollapsedCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCollapser.Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Stock")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));this.gateway = gateway;this.stock = stock;}@Overridepublic Ticker getRequestArgument() {return stock;}@Overrideprotected HystrixCommand<ImmutableMap<Ticker, StockPrice>> createCommand(Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {final Set<Ticker> stocks = collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(toSet());return new StockPricesBatchCommand(gateway, stocks);}@Overrideprotected void mapResponseToRequests(ImmutableMap<Ticker, StockPrice> batchResponse, Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {collapsedRequests.forEach(request -> {final Ticker ticker = request.getArgument();final StockPrice price = batchResponse.get(ticker);request.setResponse(price);});}}

這里有很多事情要做,所以讓我們逐步回顧StockTickerPriceCollapsedCommand 。 前三種通用類型:

  • BatchReturnType (在我們的示例中為ImmutableMap<Ticker, StockPrice> )是批處理命令響應的類型。 如您將在后面看到的那樣,崩潰程序會將多個小命令變成批處理命令。 這是該批處理命令的響應的類型。 請注意,它與StockPriceGateway.loadAll()類型相同。
  • ResponseType ( StockPrice )是要折疊的每個命令的類型。 在我們的例子中,我們正在折疊HystrixCommand<StockPrice> 。 稍后,我們將BatchReturnType值BatchReturnType為多個StockPrice 。
  • RequestArgumentType ( Ticker )是我們將要折疊(批處理)的每個命令的輸入。 當多個命令一起批處理時,我們最終將所有這些替換為一個批處理命令。 此命令應接收所有單個請求,以便執行一個批處理請求。

withTimerDelayInMilliseconds(100)將在稍后說明。 createCommand()創建一個批處理命令。 此命令應替換所有單個命令并執行批處理邏輯。 在我們的情況下,我們不會進行多次單獨的load()調用:

class StockPricesBatchCommand extends HystrixCommand<ImmutableMap<Ticker, StockPrice>> {private final StockPriceGateway gateway;private final Set<Ticker> stocks;StockPricesBatchCommand(StockPriceGateway gateway, Set<Ticker> stocks) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stocks = stocks;}@Overrideprotected ImmutableMap<Ticker, StockPrice> run() throws Exception {return gateway.loadAll(stocks);} }

此類與StockPriceCommand之間的唯一區別是,它需要一堆Ticker并返回所有價格。 Hystrix將收集幾個StockTickerPriceCollapsedCommand實例,一旦它具有足夠的StockTickerPriceCollapsedCommand (稍后再介紹),它將創建一個StockPriceCommand 。 希望這很清楚,因為mapResponseToRequests()涉及的更多。 折疊后的StockPricesBatchCommand完成后,我們必須以某種方式拆分批處理響應,并將回復傳達回各個命令,而不會崩潰。 從這個角度來看, mapResponseToRequests()實現非常簡單:我們收到批處理響應和包裝CollapsedRequest<StockPrice, Ticker>的集合。 現在,我們必須遍歷所有正在等待的單個請求并完成它們( setResponse() )。 如果我們不完成某些請求,它們將無限期掛起并最終超時。

怎么運行的

這是描述如何實現折疊的正確時機。 我之前說過崩潰是在同時發生兩個請求時發生的。 沒有一樣的時間了 。 實際上,當第一個可折疊請求進入時,Hystrix會啟動一個計時器。 在我們的示例中,我們將其設置為100毫秒。 在此期間,我們的命令被掛起,等待其他命令加入。 在此可配置時間段之后,Hystrix將調用createCommand() ,收集所有請求鍵(通過調用getRequestArgument() )并運行它。 批處理命令完成后,它將使我們將結果分發給所有等待中的單個命令。 如果我們擔心創建龐大的批處理,也可以限制已折疊請求的數量–另一方面,在此短時間內可以容納多少并發請求?

用例和缺點

請求折疊應在承受高負載(請求頻率很高)的系統中使用。 如果每個折疊時間窗口(在示例中為100毫秒)僅收到一個請求,則折疊只會增加開銷。 這是因為每次您調用可折疊命令時,它都必須等待,以防萬一其他命令想要加入并形成批處理。 僅當至少折疊了幾個命令時,這才有意義。 節省網絡等待時間和/或更好地利用合作者中的資源可以平衡浪費的等待時間(與單個呼叫相比,批處理請求通常要快得多)。 但是請記住,折疊是一把雙刃劍,在特定情況下很有用。

最后要記住的一件事–為了使用請求折疊,您需要在try-finally塊中使用HystrixRequestContext.initializeContext()和shutdown() :

HystrixRequestContext context = HystrixRequestContext.initializeContext(); try {//... } finally {context.shutdown(); }

崩潰與緩存

您可能會認為可以通過適當的緩存來代替崩潰。 這不是真的。 在以下情況下使用緩存:

  • 資源可能會被多次訪問
  • 我們可以安全地使用先前的值,它會在一段時間內保持有效, 或者我們確切地知道如何使它無效
  • 我們可以提供對同一資源的并發請求以多次計算
  • 另一方面,折疊不會強制數據的局部性(1),它總是命中實際服務,并且永遠不會返回陳舊的數據(2)。 最后,如果我們從多個線程中請求相同的資源,我們將僅調用一次備份服務(3)。 在進行緩存的情況下,除非您的緩存真的很聰明,否則兩個線程將獨立地發現緩存中沒有給定的資源,并兩次請求支持服務。 但是,折疊可以與緩存一起使用-通過在運行可折疊命令之前咨詢緩存。

    摘要

    請求折疊是一個有用的工具,但是用例非常有限。 它可以顯著提高我們系統的吞吐量,并限制外部服務的負載。 崩潰可以神奇地平抑流量高峰,而不是將其散布到各處。 只要確保將其用于以極高頻率運行的命令即可。

    翻譯自: https://www.javacodegeeks.com/2014/11/batching-collapsing-requests-in-hystrix.html

    總結

    以上是生活随笔為你收集整理的Hystrix中的批量(折叠)请求的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。