日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

kafka实现异步发送_Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程

發布時間:2023/11/27 生活经验 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka实现异步发送_Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

線上某服務 A 調用服務 B 接口完成一次交易,一次晚上的生產變更之后,系統監控發現服務 B 接口頻繁超時,后續甚至返回線程池耗盡錯誤 Thread pool is EXHAUSTED。因為服務 B 依賴外部接口,剛開始誤以為外部接口延時導致,所以臨時增加服務 B dubbo 線程池線程數量。配置變更之后,重啟服務,服務恢復正常。一段時間之后,服務 B 再次返回線程池耗盡錯誤。這次深入排查問題之后,才發現 Kafka 異步發送消息阻塞了 dubbo 線程,從而導致調用超時。

一、問題分析

Dubbo 2.6.5,Kafak maven 0.8.0-beta1

服務 A 調用服務 B,收到如下錯誤:

2019-08-30 09:14:52,311 WARN method [%f [DUBBO] Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-xxxx, Pool Size: 1000 (active: 1000, core: 1000, max: 1000, largest: 1000), Task: 6491 (completed: 5491), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://xxxx!, dubbo version: 2.6.0, current host: 127.0.0.1

可以看到當前 dubbo 線程池已經滿載運行,不能再接受新的調用。正常情況下 dubbo 線程可以很快完成任務,然后歸還到線程池中。由于線程執行的任務發生阻塞,消費者端調用超時。而服務提供者端由于已有線程被阻塞,線程池必須不斷創建新線程處理任務,直到線程數量達到最大數量,系統返回 Thread pool is EXHAUSTED

線程任務長時間被阻塞可能原因有:

  • 頻繁的 fullgc,導致系統暫停。
  • 調用某些阻塞 API,如 socket 連接未設置超時時間導致阻塞。
  • 系統內部死鎖

通過分析系統堆棧 dump 情況,果然發現所有 dubbo 線程都處于 WATTING 狀態。

下圖為應用堆棧 dump 日志:

從堆棧日志可以看到 dubbo 線程最后阻塞在 LinkedBlockingQueue#put ,而該阻塞發生在 Kafka 發送消息方法內。

這里服務 B 需要使用 Kafka 發送監控消息,為了消息發送不影響主業務,這里使用 Kafka 異步發送消息。由于 Kafka 服務端最近更換了對外的端口,而服務 B Kafka 配置未及時變更。最后服務 B 修改配置,服務重新啟動,該問題得以解決。

二、Kafka 異步模式

下面分析 Kafka 異步發送消息阻塞的實際原因。

0.8.0 Kafka 默認使用同步模式發送消息,異步發送消息需要設置producer.type=async屬性。同步模式需要等待 Kafka 將消息發送到消息隊列,這個過程當然會阻塞主線程。而異步模式最大的優點在于無需要等待 Kafka 這個發送過程。

原本認為這里的異步是使用子線程去運行任務,但是 Kafka 異步模式并非這樣。查看 Kafka 官方文檔producer,可以看到對異步模式描述。

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer has an asynchronous mode that accumulates data in memory and sends out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 100 messages or 5 seconds). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. Since this buffering happens in the client it obviously reduces the durability as any data buffered in memory and not yet sent will be lost in the event of a producer crash.

從上我們可以看到,Kafka 異步模式將會把多條消息打包一塊批量發送到服務端。這種模式將會先把消息放到內存隊列中,直到消息到達一定數量(默認為 200)或者等待時間超限(默認為 5000ms)。

這么做最大好處在于提高消息發送的吞吐量,減少網絡 I/O。當然這么做也存在明顯劣勢,如果生產者宕機,在內存中還未發送消息可能就會丟失。

下面從 kafka 源碼分析這個阻塞過程。

三、Kafka 源碼解析

Kafka 消息發送端采用如下配置:

Properties props = new Properties();props.put("metadata.broker.list", "localhost:9092");// 選擇異步發送props.put("producer.type", "async");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("queue.buffering.max.messages","1");props.put("batch.num.messages","1");Producer<Integer, String> producer= new Producer(new ProducerConfig(props));producer.send(new KeyedMessage("test", "hello world"));

這里設置 producer.type=async,從而使 Kafka 異步發送消息。

send 方法源碼如下

ps: 這個版本 Kafka 源碼采用 Scala 編寫,不過源碼還是比較簡單,比較容易閱讀。

def send(messages: KeyedMessage[K,V]*) {if (hasShutdown.get)throw new ProducerClosedExceptionrecordStats(messages)sync match {case true => eventHandler.handle(messages)// 由于  producer.type=async 異步發送case false => asyncSend(messages)}}

由于我們上面設置 producer.type=async,這里將會使用 asyncSend 異步發送模式。

asyncSend 源碼如下

private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {for (message <- messages) {val added = config.queueEnqueueTimeoutMs match {case 0  =>queue.offer(message)case _  =>try {config.queueEnqueueTimeoutMs < 0 match {case true =>queue.put(message)truecase _ =>queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)}}catch {case e: InterruptedException =>false}}if(!added) {producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)}else {trace("Added to send queue an event: " + message.toString)trace("Remaining queue size: " + queue.remainingCapacity)}}}

asyncSend 將會把消息加入到 LinkedBlockingQueue 阻塞隊列中。這里根據 config.queueEnqueueTimeoutMs參數使用不同方法。

config.queueEnqueueTimeoutMs=0,將會調用 LinkedBlockingQueue#offer,如果該隊列未滿,將會把元素插入隊列隊尾。如果隊列未滿,直接返回 false。所以如果此時隊列已滿,消息不再會加入隊列中,然后 asyncSend 將會拋出 QueueFullException 異常。

config.queueEnqueueTimeoutMs < 0,將會調用 LinkedBlockingQueue#put 加入元素,如果該隊列已滿,該方法將會一直被阻塞直到隊列存在可用空間。

config.queueEnqueueTimeoutMs > 0,將會調用 LinkedBlockingQueue#offer,這里與上面不同之處在于設置超時時間,如果隊列已滿將會阻塞知道超時。

config.queueEnqueueTimeoutMs參數通過 queue.enqueue.timeout.ms 配置生效,默認為 -1。默認情況下 LinkedBlockingQueue 最大數量為 10000,可以通過設置 queue.buffering.max.messages 改變隊列最大值。

消息放到隊列中后,Kafka 將會使用一個異步線程不斷從隊列中獲取消息,批量發送消息。

異步處理消息代碼如下

private def processEvents() {var lastSend = SystemTime.millisecondsvar events = new ArrayBuffer[KeyedMessage[K,V]]var full: Boolean = false// drain the queue until you get a shutdown commandStream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS)).takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {currentQueueItem =>val elapsed = (SystemTime.milliseconds - lastSend)// check if the queue time is reached. This happens when the poll method above returns after a timeout and// returns a null objectval expired = currentQueueItem == nullif(currentQueueItem != null) {trace("Dequeued item for topic %s, partition key: %s, data: %s".format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))events += currentQueueItem}// check if the batch size is reachedfull = events.size >= batchSizeif(full || expired) {if(expired)debug(elapsed + " ms elapsed. Queue time reached. Sending..")if(full)debug("Batch full. Sending..")// if either queue time has reached or batch size has reached, dispatch to event handlertryToHandle(events)lastSend = SystemTime.millisecondsevents = new ArrayBuffer[KeyedMessage[K,V]]}}// send the last batch of eventstryToHandle(events)if(queue.size > 0)throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue".format(queue.size))}

這里異步線程將會不斷從隊列中獲取任務,一旦條件滿足,就會批量發送任務。該條件為:

  1. 批量消息數量達到 200,可以設置 batch.num.messages 參數改變配置。
  2. 等待時間到達最大的超時時間,默認為 5000ms,可以設置 queue.buffering.max.ms 改變改配置。

四、問題解決辦法

上面問題雖然通過更換 Kafka 正確地址解決,但是為了預防下次該問題再發生,可以采用如下方案:

  1. 改變 config.queueEnqueueTimeoutMs默認配置,像這種系統監控日志允許丟失,所以可以設置 config.queueEnqueueTimeoutMs=0
  2. 升級 Kafka 版本,最新版本 Kafka 使用 Java 重寫發送端邏輯,不再使用阻塞隊列存儲消息。

本文首發于:studyidea.cn/kafka…

歡迎關注我的公眾號:程序通事,獲得日常干貨推送。如果您對我的專題內容感興趣,也可以關注我的博客:studyidea.cn

總結

以上是生活随笔為你收集整理的kafka实现异步发送_Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 一本大道av | xxxx国产片 | 米奇影视第四色 | 男人天堂伊人 | 神马午夜激情 | 成人毛片在线播放 | 97超碰国产在线 | 性爱免费视频 | 999在线观看视频 | 蜜臀av色欲a片无码精品一区 | 超碰在线91| aaa午夜| 荡女精品导航 | 清清草视频 | 在线播放精品视频 | 99久久影视 | 亚洲色图视频网站 | 婷婷五月精品中文字幕 | 夜夜嗨av一区二区三区 | 欧美激情在线一区 | 国产乱视频| 毛片网站在线看 | 国产情侣酒店自拍 | 久久久久97| 欧美日韩激情 | 国产成人免费观看视频 | 欧美婷婷六月丁香综合色 | 超碰在线c | 99久久久无码国产精品 | 黑人巨大精品欧美一区免费视频 | 亚洲欧洲精品在线 | 少妇性l交大片免费观看 | 激情www | 怨女1988国语版在线观看高清 | 国产成人看片 | 农村激情伦hxvideos | 国产老女人乱淫免费 | 天堂在线观看 | 丰满少妇熟乱xxxxx视频 | 真人毛片97级无遮挡精品 | 久久丝袜美腿 | 黄色片久久久 | 911亚洲精品 | 国产精品夜色一区二区三区 | 爽爽免费视频 | 亚洲成人视屏 | 久草播放 | 在线观看免费日韩av | 毛片aaaaa| 日韩欧美在线观看一区二区三区 | 狂野欧美性猛交xxxx777 | 久久青青操 | 国产福利片一区二区 | 女生高潮视频在线观看 | 天海翼av在线播放 | 精品国产一区二区三区久久 | 少妇毛片一区二区三区粉嫩av | 亚洲大胆| 欧美第一夜 | 色宗合| 日韩久久久久久久 | 亚洲最新视频 | 美女被变态侵犯 | 北岛玲av| 91精品国产91久久久久 | 亚洲欧洲无码一区二区三区 | 免费观看黄一级视频 | 懂色av粉嫩av蜜乳av | 亚洲色图网址 | 亚洲国产精品av | 久久久久国产精品人妻 | 我和岳m愉情xxxⅹ视频 | 国产男男网站 | 国产成人啪精品 | 91玉足脚交嫩脚丫在线播放 | 国产精品18久久久 | 很污的网站 | 日本一区二区在线免费 | 色玖玖| 爽插| 国产一区二区在线不卡 | 中文字幕xxxx| 激情四月 | 精品爆乳一区二区三区无码av | 亚洲一区二区小说 | 免费观看成人鲁鲁鲁鲁鲁视频 | 伊人中文网 | 免费人成视频在线播放 | 欧美一区不卡 | 亚洲品质自拍 | 亚洲欧美在线观看 | 国产一级黄| 国产91视频在线观看 | 日本成人免费在线视频 | r级无码视频在线观看 | 波多野结衣国产 | 五月天婷婷社区 | 欧美黑人又粗又大高潮喷水 | 无码人妻丰满熟妇区96 |