Hystrix中的批量(折叠)请求
Hystrix具有折疊(或批處理)請求的高級功能。 如果兩個或多個命令同時運行相似的請求,Hystrix可以將它們組合在一起,運行一個批處理請求,并將拆分結(jié)果分派回所有命令。 首先讓我們看看Hystrix如何工作而不會崩潰。 假設(shè)我們有一個StockPrice給定Ticker StockPrice的服務(wù):
為了方便起見, StockPriceGateway核心實現(xiàn)必須提供loadAll()批處理方法,而實現(xiàn)load()方法則是為了方便。 因此,我們的網(wǎng)關(guān)能夠批量加載多個價格(例如,以減少延遲或網(wǎng)絡(luò)協(xié)議開銷),但是目前我們不使用此功能,始終一次加載一個股票的價格:
class StockPriceCommand extends HystrixCommand<StockPrice> {private final StockPriceGateway gateway;private final Ticker stock;StockPriceCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stock = stock;}@Overrideprotected StockPrice run() throws Exception {return gateway.load(stock);} }這樣的命令將始終為每個Ticker調(diào)用StockPriceGateway.load() ,如以下測試所示:
class StockPriceCommandTest extends Specification {def gateway = Mock(StockPriceGateway)def 'should fetch price from external service'() {given:gateway.load(TickerExamples.any()) >> StockPriceExamples.any()def command = new StockPriceCommand(gateway, TickerExamples.any())when:def price = command.execute()then:price == StockPriceExamples.any()}def 'should call gateway exactly once when running Hystrix command'() {given:def command = new StockPriceCommand(gateway, TickerExamples.any())when:command.execute()then:1 * gateway.load(TickerExamples.any())}def 'should call gateway twice when command executed two times'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:commandOne.execute()commandTwo.execute()then:2 * gateway.load(TickerExamples.any())}def 'should call gateway twice even when executed in parallel'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:2 * gateway.load(TickerExamples.any())}}如果您不了解Hystrix,則通過將外部調(diào)用包裝在命令中可以獲得許多功能,例如超時,斷路器等。但這不是本文的重點。 看一下最后兩個測試:當兩次,順序或并行( queue() )兩次詢問任意行情的價格時,我們的外部gateway也被兩次調(diào)用。 上一次測試特別有趣–我們幾乎同時要求相同的報價,但Hystrix不能弄清楚。 這兩個命令是完全獨立的,將在不同的線程中執(zhí)行,彼此之間一無所知-即使它們幾乎同時運行。
折疊就是找到類似的請求并將其組合。 批處理(我將這個術(shù)語與崩潰互換使用)不會自動發(fā)生,并且需要一些編碼。 但是首先讓我們看看它的行為:
def 'should collapse two commands executed concurrently for the same stock ticker'() {given:def anyTicker = TickerExamples.any()def tickers = [anyTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, anyTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:0 * gateway.load(_)1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any()) }def 'should collapse two commands executed concurrently for the different stock tickers'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other()) }def 'should correctly map collapsed response into individual requests'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setgateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other())and:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:def anyPrice = futureOne.get()def otherPrice = futureTwo.get()then:anyPrice == StockPriceExamples.any()otherPrice == StockPriceExamples.other() }第一個測試證明,不是兩次調(diào)用load() ,而是幾乎一次調(diào)用loadAll() 。 還要注意,由于我們要求相同的Ticker (來自兩個不同的線程),因此loadAll()僅請求一個代碼。 第二個測試顯示兩個并發(fā)請求,兩個不同的行情收錄器被折疊為一個批處理調(diào)用。 第三次測試可確保我們?nèi)阅軐γ總€單獨的請求得到正確的響應(yīng)。 相反,延長HystrixCommand我們必須擴展更加復(fù)雜HystrixCollapser 。 現(xiàn)在是時候看到StockTickerPriceCollapsedCommand實現(xiàn),它無縫替換了StockPriceCommand :
class StockTickerPriceCollapsedCommand extends HystrixCollapser<ImmutableMap<Ticker, StockPrice>, StockPrice, Ticker> {private final StockPriceGateway gateway;private final Ticker stock;StockTickerPriceCollapsedCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCollapser.Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Stock")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));this.gateway = gateway;this.stock = stock;}@Overridepublic Ticker getRequestArgument() {return stock;}@Overrideprotected HystrixCommand<ImmutableMap<Ticker, StockPrice>> createCommand(Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {final Set<Ticker> stocks = collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(toSet());return new StockPricesBatchCommand(gateway, stocks);}@Overrideprotected void mapResponseToRequests(ImmutableMap<Ticker, StockPrice> batchResponse, Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {collapsedRequests.forEach(request -> {final Ticker ticker = request.getArgument();final StockPrice price = batchResponse.get(ticker);request.setResponse(price);});}}這里有很多事情要做,所以讓我們逐步回顧StockTickerPriceCollapsedCommand 。 前三種通用類型:
- BatchReturnType (在我們的示例中為ImmutableMap<Ticker, StockPrice> )是批處理命令響應(yīng)的類型。 如您將在后面看到的那樣,崩潰程序會將多個小命令變成批處理命令。 這是該批處理命令的響應(yīng)的類型。 請注意,它與StockPriceGateway.loadAll()類型相同。
- ResponseType ( StockPrice )是要折疊的每個命令的類型。 在我們的例子中,我們正在折疊HystrixCommand<StockPrice> 。 稍后,我們將BatchReturnType值BatchReturnType為多個StockPrice 。
- RequestArgumentType ( Ticker )是我們將要折疊(批處理)的每個命令的輸入。 當多個命令一起批處理時,我們最終將所有這些替換為一個批處理命令。 此命令應(yīng)接收所有單個請求,以便執(zhí)行一個批處理請求。
withTimerDelayInMilliseconds(100)將在稍后說明。 createCommand()創(chuàng)建一個批處理命令。 此命令應(yīng)替換所有單個命令并執(zhí)行批處理邏輯。 在我們的情況下,我們不會進行多次單獨的load()調(diào)用:
class StockPricesBatchCommand extends HystrixCommand<ImmutableMap<Ticker, StockPrice>> {private final StockPriceGateway gateway;private final Set<Ticker> stocks;StockPricesBatchCommand(StockPriceGateway gateway, Set<Ticker> stocks) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stocks = stocks;}@Overrideprotected ImmutableMap<Ticker, StockPrice> run() throws Exception {return gateway.loadAll(stocks);} }此類與StockPriceCommand之間的唯一區(qū)別是,它需要一堆Ticker并返回所有價格。 Hystrix將收集幾個StockTickerPriceCollapsedCommand實例,一旦它具有足夠的StockTickerPriceCollapsedCommand (稍后再介紹),它將創(chuàng)建一個StockPriceCommand 。 希望這很清楚,因為mapResponseToRequests()涉及的更多。 折疊后的StockPricesBatchCommand完成后,我們必須以某種方式拆分批處理響應(yīng),并將回復(fù)傳達回各個命令,而不會崩潰。 從這個角度來看, mapResponseToRequests()實現(xiàn)非常簡單:我們收到批處理響應(yīng)和包裝CollapsedRequest<StockPrice, Ticker>的集合。 現(xiàn)在,我們必須遍歷所有正在等待的單個請求并完成它們( setResponse() )。 如果我們不完成某些請求,它們將無限期掛起并最終超時。
怎么運行的
這是描述如何實現(xiàn)折疊的正確時機。 我之前說過崩潰是在同時發(fā)生兩個請求時發(fā)生的。 沒有一樣的時間了 。 實際上,當?shù)谝粋€可折疊請求進入時,Hystrix會啟動一個計時器。 在我們的示例中,我們將其設(shè)置為100毫秒。 在此期間,我們的命令被掛起,等待其他命令加入。 在此可配置時間段之后,Hystrix將調(diào)用createCommand() ,收集所有請求鍵(通過調(diào)用getRequestArgument() )并運行它。 批處理命令完成后,它將使我們將結(jié)果分發(fā)給所有等待中的單個命令。 如果我們擔心創(chuàng)建龐大的批處理,也可以限制已折疊請求的數(shù)量–另一方面,在此短時間內(nèi)可以容納多少并發(fā)請求?
用例和缺點
請求折疊應(yīng)在承受高負載(請求頻率很高)的系統(tǒng)中使用。 如果每個折疊時間窗口(在示例中為100毫秒)僅收到一個請求,則折疊只會增加開銷。 這是因為每次您調(diào)用可折疊命令時,它都必須等待,以防萬一其他命令想要加入并形成批處理。 僅當至少折疊了幾個命令時,這才有意義。 節(jié)省網(wǎng)絡(luò)等待時間和/或更好地利用合作者中的資源可以平衡浪費的等待時間(與單個呼叫相比,批處理請求通常要快得多)。 但是請記住,折疊是一把雙刃劍,在特定情況下很有用。
最后要記住的一件事–為了使用請求折疊,您需要在try-finally塊中使用HystrixRequestContext.initializeContext()和shutdown() :
HystrixRequestContext context = HystrixRequestContext.initializeContext(); try {//... } finally {context.shutdown(); }崩潰與緩存
您可能會認為可以通過適當?shù)木彺鎭泶姹罎ⅰ?這不是真的。 在以下情況下使用緩存:
另一方面,折疊不會強制數(shù)據(jù)的局部性(1),它總是命中實際服務(wù),并且永遠不會返回陳舊的數(shù)據(jù)(2)。 最后,如果我們從多個線程中請求相同的資源,我們將僅調(diào)用一次備份服務(wù)(3)。 在進行緩存的情況下,除非您的緩存真的很聰明,否則兩個線程將獨立地發(fā)現(xiàn)緩存中沒有給定的資源,并兩次請求支持服務(wù)。 但是,折疊可以與緩存一起使用-通過在運行可折疊命令之前咨詢緩存。
摘要
請求折疊是一個有用的工具,但是用例非常有限。 它可以顯著提高我們系統(tǒng)的吞吐量,并限制外部服務(wù)的負載。 崩潰可以神奇地平抑流量高峰,而不是將其散布到各處。 只要確保將其用于以極高頻率運行的命令即可。
翻譯自: https://www.javacodegeeks.com/2014/11/batching-collapsing-requests-in-hystrix.html
總結(jié)
以上是生活随笔為你收集整理的Hystrix中的批量(折叠)请求的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Data JPA教程:简介
- 下一篇: Drools和jBPM KIE Ap