kata_小规模流处理kata。 第1部分:线程池
kata
我再次為我的公司在GeeCON 2016上舉辦了編程競賽。 這次分配需要設計并根據以下要求選擇實施系統:
一個系統每秒發送大約一千個事件。 每個Event至少具有兩個屬性:
- clientId –我們期望一個客戶端每秒最多可以處理幾個事件
- UUID –全球唯一
消耗一個事件大約需要10毫秒。 設計此類流的使用者:
這些要求中沒有幾個重要的細節:
在本文中,我將指導您完成一些正確的解決方案,并嘗試一些失敗的嘗試。 您還將學習如何使用少量精確定位的指標來解決問題。
天真的順序處理
讓我們通過迭代解決這個問題。 首先,我們必須對API進行一些假設。 想象一下:
interface EventStream {void consume(EventConsumer consumer);}@FunctionalInterface interface EventConsumer {Event consume(Event event); }@Value class Event {private final Instant created = Instant.now();private final int clientId;private final UUID uuid;}典型的基于推送的API,類似于JMS。 一個重要的注意事項是EventConsumer正在阻止,這意味著直到EventConsumer消耗了前一個Event ,它才交付新的Event 。 這只是我所做的一個假設,并沒有徹底改變需求。 這也是JMS中消息偵聽器的工作方式。 天真的實現只附加了一個偵聽器,該偵聽器需要大約10毫秒才能完成:
class ClientProjection implements EventConsumer {@Overridepublic Event consume(Event event) {Sleeper.randSleep(10, 1);return event;}}當然,在現實生活中,這個使用者會在數據庫中存儲一些東西,進行遠程調用等。我在睡眠時間分配中添加了一些隨機性,以使手動測試更加實際:
class Sleeper {private static final Random RANDOM = new Random();static void randSleep(double mean, double stdDev) {final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);try {TimeUnit.MICROSECONDS.sleep((long) micros);} catch (InterruptedException e) {throw new RuntimeException(e);}}}//...EventStream es = new EventStream(); //some real implementation here es.consume(new ClientProjection());它可以編譯并運行,但是為了確定未滿足要求,我們必須插入少量指標。 最重要的指標是消息消耗的延遲,以消息創建到開始處理之間的時間來衡量。 我們將為此使用Dropwizard指標 :
class ClientProjection implements EventConsumer {private final ProjectionMetrics metrics;ClientProjection(ProjectionMetrics metrics) {this.metrics = metrics;}@Overridepublic Event consume(Event event) {metrics.latency(Duration.between(event.getCreated(), Instant.now()));Sleeper.randSleep(10, 1);return event;}}提取ProjectionMetrics類以分離職責:
import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Slf4jReporter; import lombok.extern.slf4j.Slf4j;import java.time.Duration; import java.util.concurrent.TimeUnit;@Slf4j class ProjectionMetrics {private final Histogram latencyHist;ProjectionMetrics(MetricRegistry metricRegistry) {final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry).outputTo(log).convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build();reporter.start(1, TimeUnit.SECONDS);latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, "latency"));}void latency(Duration duration) {latencyHist.update(duration.toMillis());} }現在,當您運行樸素的解決方案時,您會Swift發現中值延遲以及99.9%的百分數無限增長:
type=HISTOGRAM, [...] count=84, min=0, max=795, mean=404.88540608274104, [...]median=414.0, p75=602.0, p95=753.0, p98=783.0, p99=795.0, p999=795.0 type=HISTOGRAM, [...] count=182, min=0, max=1688, mean=861.1706371990878, [...]median=869.0, p75=1285.0, p95=1614.0, p98=1659.0, p99=1678.0, p999=1688.0[...30 seconds later...]type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.030秒后,我們的應用程序平均會延遲15秒處理事件。 并非完全實時 。 顯然,缺少并發是任何原因。 我們的ClientProjection事件使用者大約需要10毫秒才能完成,因此每秒可以處理多達100個事件,而我們還需要一個數量級。 我們必須以某種方式擴展ClientProjection 。 而且我們甚至都沒有觸及其他要求!
天真線程池
最明顯的解決方案是從多個線程調用EventConsumer 。 最簡單的方法是利用ExecutorService :
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream) {this.executorService = Executors.newFixedThreadPool(size);this.downstream = downstream;}@Overridepublic Event consume(Event event) {executorService.submit(() -> downstream.consume(event));return event;}@Overridepublic void close() throws IOException {executorService.shutdown();} }我們在這里使用裝飾器模式 。 實現EventConsumer的原始ClientProjection是正確的。 但是,我們將其與EventConsumer另一個實現并發包裝。 這將使我們能夠編寫復雜的行為而無需更改ClientProjection本身。 這樣的設計促進:
- 松散耦合:各種EventConsumer彼此都不了解,可以自由組合
- 單一責任:每個人都做一份工作,然后委派給下一個組成部分
- 開放/封閉原則 :我們可以在不修改現有實現的情況下更改系統的行為。
打開/關閉原理通常通過注入策略和模板方法模式來實現。 在這里,它甚至更簡單。 整體接線如下:
MetricRegistry metricRegistry =new MetricRegistry(); ProjectionMetrics metrics =new ProjectionMetrics(metricRegistry); ClientProjection clientProjection =new ClientProjection(metrics); NaivePool naivePool =new NaivePool(10, clientProjection); EventStream es = new EventStream(); es.consume(naivePool);我們精心設計的指標表明情況確實好得多:
type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0 type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0[...30 seconds later...]type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0然而,我們仍然看到延遲的規模越來越小,在30秒后,延遲達到了364毫秒。 它一直在增長,所以問題是系統的。 我們……需要……更多……指標。 請注意, NaivePool (您很快就會知道為什么它是naive )有正好有10個線程NaivePool 。 這應該足以處理數千個事件,每個事件需要10毫秒來處理。 實際上,我們需要一點額外的處理能力,以避免垃圾收集后或負載高峰時出現問題。 為了證明線程池實際上是我們的瓶頸,最好監視其內部隊列。 這需要一些工作:
class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();String name = MetricRegistry.name(ProjectionMetrics.class, "queue");Gauge<Integer> gauge = queue::size;metricRegistry.register(name, gauge);this.executorService = new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue);this.downstream = downstream;}@Overridepublic Event consume(Event event) {executorService.submit(() -> downstream.consume(event));return event;}@Overridepublic void close() throws IOException {executorService.shutdown();} }這里的想法是手動創建ThreadPoolExecutor ,以提供自定義的LinkedBlockingQueue實例。 我們稍后可以使用該隊列來監視其長度(請參閱: ExecutorService – 10個技巧 )。 Gauge將定期調用queue::size并將其報告給您需要的地方。 度量標準確認線程池大小確實是一個問題:
type=GAUGE, name=[...].queue, value=35 type=GAUGE, name=[...].queue, value=52[...30 seconds later...]type=GAUGE, name=[...].queue, value=601容納待處理任務的隊列的大小不斷增加,這會損害延遲。 線程池大小從10增加到20最終報告了不錯的結果,并且沒有停頓。 但是,我們仍然沒有解決重復項,也沒有針對同一clientId防止事件的同時修改。
晦澀的鎖定
讓我們從避免對同一clientId的事件進行并發處理開始。 如果兩個事件接連發生,并且都與同一個clientId相關,那么NaivePool將選擇它們并開始同時處理它們。 首先,我們至少通過為每個clientId設置一個Lock來發現這種情況:
@Slf4j class FailOnConcurrentModification implements EventConsumer {private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();private final EventConsumer downstream;FailOnConcurrentModification(EventConsumer downstream) {this.downstream = downstream;}@Overridepublic Event consume(Event event) {Lock lock = findClientLock(event);if (lock.tryLock()) {try {downstream.consume(event);} finally {lock.unlock();}} else {log.error("Client {} already being modified by another thread", event.getClientId());}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId -> new ReentrantLock());}}這肯定是朝錯誤的方向前進。 復雜程度不堪重負,但是運行此代碼至少表明存在問題。 事件處理管道如下所示,一個裝飾器包裝了另一個裝飾器:
ClientProjection clientProjection =new ClientProjection(new ProjectionMetrics(metricRegistry)); FailOnConcurrentModification failOnConcurrentModification =new FailOnConcurrentModification(clientProjection); NaivePool naivePool =new NaivePool(10, failOnConcurrentModification, metricRegistry); EventStream es = new EventStream();es.consume(naivePool);偶爾會彈出錯誤消息,告訴我們其他一些線程已經在處理同一clientId事件。 對于每個clientId我們關聯一個檢查的Lock ,以便確定當前是否有另一個線程不在處理該客戶端。 盡管丑陋,但實際上我們已經接近殘酷的解決方案。 而不是因為另一個線程已經在處理某個事件而無法獲取Lock時失敗,讓我們稍等一下,希望Lock可以被釋放:
@Slf4j class WaitOnConcurrentModification implements EventConsumer {private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();private final EventConsumer downstream;private final Timer lockWait;WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, "lockWait"));}@Overridepublic Event consume(Event event) {try {final Lock lock = findClientLock(event);final Timer.Context time = lockWait.time();try {final boolean locked = lock.tryLock(1, TimeUnit.SECONDS);time.stop();if(locked) {downstream.consume(event);}} finally {lock.unlock();}} catch (InterruptedException e) {log.warn("Interrupted", e);}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId -> new ReentrantLock());}}這個想法非常相似。 但是, tryLock()失敗,它最多等待1秒,以希望釋放給定客戶端的Lock 。 如果兩個事件很快相繼發生,一個事件將獲得一個Lock并繼續執行,而另一個事件將阻止等待unlock()發生。
不僅這些代碼確實令人費解,而且還可能以許多微妙的方式被破壞。 例如,如果幾乎同一時間發生同一客戶clientId兩個事件,但是顯然是第一個事件呢? 這兩個事件將同時請求Lock ,并且我們無法保證哪個事件會首先獲得不公平的Lock ,從而可能會亂序使用事件。 肯定有更好的辦法…
專用線程
讓我們退后一步,深吸一口氣。 您如何確保事情不會同時發生? 好吧,只需使用一個線程! 事實上,這是我們一開始所做的,但吞吐量并不令人滿意。 但是我們不關心不同的clientId的并發性,我們只需要確保具有相同clientId事件始終由同一線程處理即可!
也許您會想到創建從clientId到Thread的映射? 好吧,這將過于簡單化。 我們將創建數千個線程,每個線程在大多數時間都根據需求空閑(對于給定的clientId每秒只有很少的事件)。 一個不錯的折衷方案是固定大小的線程池,每個線程負責clientId的眾所周知的子集。 這樣,兩個不同的clientId可能會終止在同一線程上,但是同一clientId將始終由同一線程處理。 如果出現同一clientId兩個事件,則它們都將被路由到同一線程,從而避免了并發處理。 實現非常簡單:
class SmartPool implements EventConsumer, Closeable {private final List<ExecutorService> threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;List<ExecutorService> list = IntStream.range(0, size).mapToObj(i -> Executors.newSingleThreadExecutor()).collect(Collectors.toList());this.threadPools = new CopyOnWriteArrayList<>(list);}@Overridepublic void close() throws IOException {threadPools.forEach(ExecutorService::shutdown);}@Overridepublic Event consume(Event event) {final int threadIdx = event.getClientId() % threadPools.size();final ExecutorService executor = threadPools.get(threadIdx);executor.submit(() -> downstream.consume(event));return event;} }關鍵部分就在最后:
int threadIdx = event.getClientId() % threadPools.size(); ExecutorService executor = threadPools.get(threadIdx);這個簡單的算法將始終對相同的clientId使用相同的單線程ExecutorService 。 不同的ID可在同一池中結束,例如,當池大小是20 ,客戶機7 , 27 , 47等,將使用相同的線程。 但這可以,只要一個clientId始終使用同一線程即可。 此時,不需要鎖定,并且可以保證順序調用,因為同一客戶端的事件始終由同一線程執行。 旁注:每個clientId一個線程無法擴展,但是每個clientId一個角色(例如,在Akka中)是一個很好的主意,它可以簡化很多工作。
為了更加安全,我在每個線程池中插入了平均隊列大小的指標,從而使實現更長:
class SmartPool implements EventConsumer, Closeable {private final List<LinkedBlockingQueue<Runnable>> queues;private final List<ExecutorService> threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;this.queues = IntStream.range(0, size).mapToObj(i -> new LinkedBlockingQueue<Runnable>()).collect(Collectors.toList());List<ThreadPoolExecutor> list = queues.stream().map(q -> new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q)).collect(Collectors.toList());this.threadPools = new CopyOnWriteArrayList<>(list);metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, "queue"), (Gauge<Double>) this::averageQueueLength);}private double averageQueueLength() {double totalLength =queues.stream().mapToDouble(LinkedBlockingQueue::size).sum();return totalLength / queues.size();}//...}如果您偏執狂,您甚至可以為每個隊列創建一個指標。
重復數據刪除和冪等
在分布式環境中,當生產者至少有一次保證時,接收重復事件是很常見的。 這種行為的原因不在本文討論范圍之內,但我們必須學習如何解決該問題。 一種方法是將全局唯一標識符( UUID )附加到每條消息,并確保在消費者方面不會對具有相同標識符的消息進行兩次處理。 每個Event都有這樣的UUID 。 根據我們的要求,最直接的解決方案是簡單地存儲所有可見的UUID并在到達時驗證接收到的UUID從未見過。 按原樣使用ConcurrentHashMap<UUID, UUID> (JDK中沒有ConcurrentHashSet )會導致內存泄漏,因為隨著時間的推移,我們將不斷積累越來越多的ID。 這就是為什么我們僅在最近10秒內查找重復項。 從技術上講,您可以擁有ConcurrentHashMap<UUID, Instant> ,當遇到該問題時,它會從UUID映射到時間戳。 通過使用后臺線程,我們可以刪除10秒鐘以上的元素。 但是,如果您是快樂的Guava用戶,則具有聲明性驅逐策略的Cache<UUID, UUID>將達到目的:
import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder;import java.util.UUID; import java.util.concurrent.TimeUnit;class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream) {this.downstream = downstream;}@Overridepublic Event consume(Event event) {final UUID uuid = event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {return downstream.consume(event);} else {return event;}} }為了保證生產安全,我至少可以想到兩個指標可能會有用:緩存大小和發現的重復項數量。 讓我們也插入以下指標:
class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private final Meter duplicates;private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;duplicates = metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, "duplicates"));metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, "cacheSize"), (Gauge<Long>) seenUuids::size);}@Overridepublic Event consume(Event event) {final UUID uuid = event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {return downstream.consume(event);} else {duplicates.mark();return event;}} }最終,我們擁有了構建解決方案的所有要素。 這個想法是由彼此包裝的EventConsumer實例組成管道:
您可以選擇在SmartPool和ClientProjection之間放置FailOnConcurrentModification步驟,以提高安全性(設計上不應進行并發修改):
ClientProjection clientProjection =new ClientProjection(new ProjectionMetrics(metricRegistry)); FailOnConcurrentModification concurrentModification =new FailOnConcurrentModification(clientProjection); SmartPool smartPool =new SmartPool(12, concurrentModification, metricRegistry); IgnoreDuplicates withoutDuplicates =new IgnoreDuplicates(smartPool, metricRegistry); EventStream es = new EventStream(); es.consume(withoutDuplicates);我們花了很多工作才能提出相對簡單且結構合理的解決方案(我希望您同意)。 最后,解決并發問題的最佳方法是……避免并發,并在一個線程中運行受競爭條件約束的代碼。 這也是Akka actor(每個actor處理單個消息)和RxJava( Subscriber處理的一條消息)背后的思想。 在下一部分中,我們將在RxJava中看到聲明式解決方案。
翻譯自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-1-thread-pools.html
kata
總結
以上是生活随笔為你收集整理的kata_小规模流处理kata。 第1部分:线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 沙白瓜的功效与作用 沙白瓜的功效有哪些
- 下一篇: lucene使用3.0.3_Jirase