日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kata_小规模流处理kata。 第1部分:线程池

發布時間:2023/12/3 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kata_小规模流处理kata。 第1部分:线程池 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

kata

我再次為我的公司在GeeCON 2016上舉辦了編程競賽。 這次分配需要設計并根據以下要求選擇實施系統:

一個系統每秒發送大約一千個事件。 每個Event至少具有兩個屬性:

  • clientId –我們期望一個客戶端每秒最多可以處理幾個事件
  • UUID –全球唯一

消耗一個事件大約需要10毫秒。 設計此類流的使用者:

  • 允許實時處理事件
  • 與一個客戶端有關的事件應按順序進行處理,即,您不能并行處理同一clientId事件
  • 如果10秒鐘內出現重復的UUID ,請將其刪除。 假設10秒鐘后不會出現重復
  • 這些要求中沒有幾個重要的細節:

  • 1000個事件/秒和10毫秒消耗一個事件。 顯然,我們至少需要10個并發使用者才能實時消費。
  • 事件具有自然的聚合ID( clientId )。 在一秒鐘內,我們可以為給定的客戶端預期一些事件,并且不允許我們同時或無序處理它們。
  • 我們必須以某種方式忽略重復的消息,最有可能的是通過記住最近10秒鐘內的所有唯一ID。 這使大約一萬個UUID得以臨時保留。
  • 在本文中,我將指導您完成一些正確的解決方案,并嘗試一些失敗的嘗試。 您還將學習如何使用少量精確定位的指標來解決問題。

    天真的順序處理

    讓我們通過迭代解決這個問題。 首先,我們必須對API進行一些假設。 想象一下:

    interface EventStream {void consume(EventConsumer consumer);}@FunctionalInterface interface EventConsumer {Event consume(Event event); }@Value class Event {private final Instant created = Instant.now();private final int clientId;private final UUID uuid;}

    典型的基于推送的API,類似于JMS。 一個重要的注意事項是EventConsumer正在阻止,這意味著直到EventConsumer消耗了前一個Event ,它才交付新的Event 。 這只是我所做的一個假設,并沒有徹底改變需求。 這也是JMS中消息偵聽器的工作方式。 天真的實現只附加了一個偵聽器,該偵聽器需要大約10毫秒才能完成:

    class ClientProjection implements EventConsumer {@Overridepublic Event consume(Event event) {Sleeper.randSleep(10, 1);return event;}}

    當然,在現實生活中,這個使用者會在數據庫中存儲一些東西,進行遠程調用等。我在睡眠時間分配中添加了一些隨機性,以使手動測試更加實際:

    class Sleeper {private static final Random RANDOM = new Random();static void randSleep(double mean, double stdDev) {final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);try {TimeUnit.MICROSECONDS.sleep((long) micros);} catch (InterruptedException e) {throw new RuntimeException(e);}}}//...EventStream es = new EventStream(); //some real implementation here es.consume(new ClientProjection());

    它可以編譯并運行,但是為了確定未滿足要求,我們必須插入少量指標。 最重要的指標是消息消耗的延遲,以消息創建到開始處理之間的時間來衡量。 我們將為此使用Dropwizard指標 :

    class ClientProjection implements EventConsumer {private final ProjectionMetrics metrics;ClientProjection(ProjectionMetrics metrics) {this.metrics = metrics;}@Overridepublic Event consume(Event event) {metrics.latency(Duration.between(event.getCreated(), Instant.now()));Sleeper.randSleep(10, 1);return event;}}

    提取ProjectionMetrics類以分離職責:

    import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Slf4jReporter; import lombok.extern.slf4j.Slf4j;import java.time.Duration; import java.util.concurrent.TimeUnit;@Slf4j class ProjectionMetrics {private final Histogram latencyHist;ProjectionMetrics(MetricRegistry metricRegistry) {final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry).outputTo(log).convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build();reporter.start(1, TimeUnit.SECONDS);latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, "latency"));}void latency(Duration duration) {latencyHist.update(duration.toMillis());} }

    現在,當您運行樸素的解決方案時,您會Swift發現中值延遲以及99.9%的百分數無限增長:

    type=HISTOGRAM, [...] count=84, min=0, max=795, mean=404.88540608274104, [...]median=414.0, p75=602.0, p95=753.0, p98=783.0, p99=795.0, p999=795.0 type=HISTOGRAM, [...] count=182, min=0, max=1688, mean=861.1706371990878, [...]median=869.0, p75=1285.0, p95=1614.0, p98=1659.0, p99=1678.0, p999=1688.0[...30 seconds later...]type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.0

    30秒后,我們的應用程序平均會延遲15秒處理事件。 并非完全實時 。 顯然,缺少并發是任何原因。 我們的ClientProjection事件使用者大約需要10毫秒才能完成,因此每秒可以處理多達100個事件,而我們還需要一個數量級。 我們必須以某種方式擴展ClientProjection 。 而且我們甚至都沒有觸及其他要求!

    天真線程池

    最明顯的解決方案是從多個線程調用EventConsumer 。 最簡單的方法是利用ExecutorService :

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream) {this.executorService = Executors.newFixedThreadPool(size);this.downstream = downstream;}@Overridepublic Event consume(Event event) {executorService.submit(() -> downstream.consume(event));return event;}@Overridepublic void close() throws IOException {executorService.shutdown();} }

    我們在這里使用裝飾器模式 。 實現EventConsumer的原始ClientProjection是正確的。 但是,我們將其與EventConsumer另一個實現并發包裝。 這將使我們能夠編寫復雜的行為而無需更改ClientProjection本身。 這樣的設計促進:

    • 松散耦合:各種EventConsumer彼此都不了解,可以自由組合
    • 單一責任:每個人都做一份工作,然后委派給下一個組成部分
    • 開放/封閉原則 :我們可以在不修改現有實現的情況下更改系統的行為。

    打開/關閉原理通常通過注入策略和模板方法模式來實現。 在這里,它甚至更簡單。 整體接線如下:

    MetricRegistry metricRegistry =new MetricRegistry(); ProjectionMetrics metrics =new ProjectionMetrics(metricRegistry); ClientProjection clientProjection =new ClientProjection(metrics); NaivePool naivePool =new NaivePool(10, clientProjection); EventStream es = new EventStream(); es.consume(naivePool);

    我們精心設計的指標表明情況確實好得多:

    type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0 type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0[...30 seconds later...]type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0

    然而,我們仍然看到延遲的規模越來越小,在30秒后,延遲達到了364毫秒。 它一直在增長,所以問題是系統的。 我們……需要……更多……指標。 請注意, NaivePool (您很快就會知道為什么它是naive )有正好有10個線程NaivePool 。 這應該足以處理數千個事件,每個事件需要10毫秒來處理。 實際上,我們需要一點額外的處理能力,以避免垃圾收集后或負載高峰時出現問題。 為了證明線程池實際上是我們的瓶頸,最好監視其內部隊列。 這需要一些工作:

    class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();String name = MetricRegistry.name(ProjectionMetrics.class, "queue");Gauge<Integer> gauge = queue::size;metricRegistry.register(name, gauge);this.executorService = new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue);this.downstream = downstream;}@Overridepublic Event consume(Event event) {executorService.submit(() -> downstream.consume(event));return event;}@Overridepublic void close() throws IOException {executorService.shutdown();} }

    這里的想法是手動創建ThreadPoolExecutor ,以提供自定義的LinkedBlockingQueue實例。 我們稍后可以使用該隊列來監視其長度(請參閱: ExecutorService – 10個技巧 )。 Gauge將定期調用queue::size并將其報告給您需要的地方。 度量標準確認線程池大小確實是一個問題:

    type=GAUGE, name=[...].queue, value=35 type=GAUGE, name=[...].queue, value=52[...30 seconds later...]type=GAUGE, name=[...].queue, value=601

    容納待處理任務的隊列的大小不斷增加,這會損害延遲。 線程池大小從10增加到20最終報告了不錯的結果,并且沒有停頓。 但是,我們仍然沒有解決重復項,也沒有針對同一clientId防止事件的同時修改。

    晦澀的鎖定

    讓我們從避免對同一clientId的事件進行并發處理開始。 如果兩個事件接連發生,并且都與同一個clientId相關,那么NaivePool將選擇它們并開始同時處理它們。 首先,我們至少通過為每個clientId設置一個Lock來發現這種情況:

    @Slf4j class FailOnConcurrentModification implements EventConsumer {private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();private final EventConsumer downstream;FailOnConcurrentModification(EventConsumer downstream) {this.downstream = downstream;}@Overridepublic Event consume(Event event) {Lock lock = findClientLock(event);if (lock.tryLock()) {try {downstream.consume(event);} finally {lock.unlock();}} else {log.error("Client {} already being modified by another thread", event.getClientId());}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId -> new ReentrantLock());}}

    這肯定是朝錯誤的方向前進。 復雜程度不堪重負,但是運行此代碼至少表明存在問題。 事件處理管道如下所示,一個裝飾器包裝了另一個裝飾器:

    ClientProjection clientProjection =new ClientProjection(new ProjectionMetrics(metricRegistry)); FailOnConcurrentModification failOnConcurrentModification =new FailOnConcurrentModification(clientProjection); NaivePool naivePool =new NaivePool(10, failOnConcurrentModification, metricRegistry); EventStream es = new EventStream();es.consume(naivePool);

    偶爾會彈出錯誤消息,告訴我們其他一些線程已經在處理同一clientId事件。 對于每個clientId我們關聯一個檢查的Lock ,以便確定當前是否有另一個線程不在處理該客戶端。 盡管丑陋,但實際上我們已經接近殘酷的解決方案。 而不是因為另一個線程已經在處理某個事件而無法獲取Lock時失敗,讓我們稍等一下,希望Lock可以被釋放:

    @Slf4j class WaitOnConcurrentModification implements EventConsumer {private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();private final EventConsumer downstream;private final Timer lockWait;WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, "lockWait"));}@Overridepublic Event consume(Event event) {try {final Lock lock = findClientLock(event);final Timer.Context time = lockWait.time();try {final boolean locked = lock.tryLock(1, TimeUnit.SECONDS);time.stop();if(locked) {downstream.consume(event);}} finally {lock.unlock();}} catch (InterruptedException e) {log.warn("Interrupted", e);}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId -> new ReentrantLock());}}

    這個想法非常相似。 但是, tryLock()失敗,它最多等待1秒,以希望釋放給定客戶端的Lock 。 如果兩個事件很快相繼發生,一個事件將獲得一個Lock并繼續執行,而另一個事件將阻止等待unlock()發生。

    不僅這些代碼確實令人費解,而且還可能以許多微妙的方式被破壞。 例如,如果幾乎同一時間發生同一客戶clientId兩個事件,但是顯然是第一個事件呢? 這兩個事件將同時請求Lock ,并且我們無法保證哪個事件會首先獲得不公平的Lock ,從而可能會亂序使用事件。 肯定有更好的辦法…

    專用線程

    讓我們退后一步,深吸一口氣。 您如何確保事情不會同時發生? 好吧,只需使用一個線程! 事實上,這是我們一開始所做的,但吞吐量并不令人滿意。 但是我們不關心不同的clientId的并發性,我們只需要確保具有相同clientId事件始終由同一線程處理即可!

    也許您會想到創建從clientId到Thread的映射? 好吧,這將過于簡單化。 我們將創建數千個線程,每個線程在大多數時間都根據需求空閑(對于給定的clientId每秒只有很少的事件)。 一個不錯的折衷方案是固定大小的線程池,每個線程負責clientId的眾所周知的子集。 這樣,兩個不同的clientId可能會終止在同一線程上,但是同一clientId將始終由同一線程處理。 如果出現同一clientId兩個事件,則它們都將被路由到同一線程,從而避免了并發處理。 實現非常簡單:

    class SmartPool implements EventConsumer, Closeable {private final List<ExecutorService> threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;List<ExecutorService> list = IntStream.range(0, size).mapToObj(i -> Executors.newSingleThreadExecutor()).collect(Collectors.toList());this.threadPools = new CopyOnWriteArrayList<>(list);}@Overridepublic void close() throws IOException {threadPools.forEach(ExecutorService::shutdown);}@Overridepublic Event consume(Event event) {final int threadIdx = event.getClientId() % threadPools.size();final ExecutorService executor = threadPools.get(threadIdx);executor.submit(() -> downstream.consume(event));return event;} }

    關鍵部分就在最后:

    int threadIdx = event.getClientId() % threadPools.size(); ExecutorService executor = threadPools.get(threadIdx);

    這個簡單的算法將始終對相同的clientId使用相同的單線程ExecutorService 。 不同的ID可在同一池中結束,例如,當池大小是20 ,客戶機7 , 27 , 47等,將使用相同的線程。 但這可以,只要一個clientId始終使用同一線程即可。 此時,不需要鎖定,并且可以保證順序調用,因為同一客戶端的事件始終由同一線程執行。 旁注:每個clientId一個線程無法擴展,但是每個clientId一個角色(例如,在Akka中)是一個很好的主意,它可以簡化很多工作。

    為了更加安全,我在每個線程池中插入了平均隊列大小的指標,從而使實現更長:

    class SmartPool implements EventConsumer, Closeable {private final List<LinkedBlockingQueue<Runnable>> queues;private final List<ExecutorService> threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;this.queues = IntStream.range(0, size).mapToObj(i -> new LinkedBlockingQueue<Runnable>()).collect(Collectors.toList());List<ThreadPoolExecutor> list = queues.stream().map(q -> new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q)).collect(Collectors.toList());this.threadPools = new CopyOnWriteArrayList<>(list);metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, "queue"), (Gauge<Double>) this::averageQueueLength);}private double averageQueueLength() {double totalLength =queues.stream().mapToDouble(LinkedBlockingQueue::size).sum();return totalLength / queues.size();}//...}

    如果您偏執狂,您甚至可以為每個隊列創建一個指標。

    重復數據刪除和冪等

    在分布式環境中,當生產者至少有一次保證時,接收重復事件是很常見的。 這種行為的原因不在本文討論范圍之內,但我們必須學習如何解決該問題。 一種方法是將全局唯一標識符( UUID )附加到每條消息,并確保在消費者方面不會對具有相同標識符的消息進行兩次處理。 每個Event都有這樣的UUID 。 根據我們的要求,最直接的解決方案是簡單地存儲所有可見的UUID并在到達時驗證接收到的UUID從未見過。 按原樣使用ConcurrentHashMap<UUID, UUID> (JDK中沒有ConcurrentHashSet )會導致內存泄漏,因為隨著時間的推移,我們將不斷積累越來越多的ID。 這就是為什么我們僅在最近10秒內查找重復項。 從技術上講,您可以擁有ConcurrentHashMap<UUID, Instant> ,當遇到該問題時,它會從UUID映射到時間戳。 通過使用后臺線程,我們可以刪除10秒鐘以上的元素。 但是,如果您是快樂的Guava用戶,則具有聲明性驅逐策略的Cache<UUID, UUID>將達到目的:

    import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder;import java.util.UUID; import java.util.concurrent.TimeUnit;class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream) {this.downstream = downstream;}@Overridepublic Event consume(Event event) {final UUID uuid = event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {return downstream.consume(event);} else {return event;}} }

    為了保證生產安全,我至少可以想到兩個指標可能會有用:緩存大小和發現的重復項數量。 讓我們也插入以下指標:

    class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private final Meter duplicates;private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;duplicates = metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, "duplicates"));metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, "cacheSize"), (Gauge<Long>) seenUuids::size);}@Overridepublic Event consume(Event event) {final UUID uuid = event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {return downstream.consume(event);} else {duplicates.mark();return event;}} }

    最終,我們擁有了構建解決方案的所有要素。 這個想法是由彼此包裝的EventConsumer實例組成管道:

  • 首先,我們應用IgnoreDuplicates拒絕重復項
  • 然后,我們調用SmartPool ,它將始終將給定的clientId到同一線程,并在該線程中執行下一階段
  • 最后,調用真正執行業務邏輯的ClientProjection 。
  • 您可以選擇在SmartPool和ClientProjection之間放置FailOnConcurrentModification步驟,以提高安全性(設計上不應進行并發修改):

    ClientProjection clientProjection =new ClientProjection(new ProjectionMetrics(metricRegistry)); FailOnConcurrentModification concurrentModification =new FailOnConcurrentModification(clientProjection); SmartPool smartPool =new SmartPool(12, concurrentModification, metricRegistry); IgnoreDuplicates withoutDuplicates =new IgnoreDuplicates(smartPool, metricRegistry); EventStream es = new EventStream(); es.consume(withoutDuplicates);

    我們花了很多工作才能提出相對簡單且結構合理的解決方案(我希望您同意)。 最后,解決并發問題的最佳方法是……避免并發,并在一個線程中運行受競爭條件約束的代碼。 這也是Akka actor(每個actor處理單個消息)和RxJava( Subscriber處理的一條消息)背后的思想。 在下一部分中,我們將在RxJava中看到聲明式解決方案。

    翻譯自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-1-thread-pools.html

    kata

    總結

    以上是生活随笔為你收集整理的kata_小规模流处理kata。 第1部分:线程池的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。