kafka实现异步发送_Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程
線上某服務(wù) A 調(diào)用服務(wù) B 接口完成一次交易,一次晚上的生產(chǎn)變更之后,系統(tǒng)監(jiān)控發(fā)現(xiàn)服務(wù) B 接口頻繁超時(shí),后續(xù)甚至返回線程池耗盡錯(cuò)誤 Thread pool is EXHAUSTED。因?yàn)榉?wù) B 依賴外部接口,剛開始誤以為外部接口延時(shí)導(dǎo)致,所以臨時(shí)增加服務(wù) B dubbo 線程池線程數(shù)量。配置變更之后,重啟服務(wù),服務(wù)恢復(fù)正常。一段時(shí)間之后,服務(wù) B 再次返回線程池耗盡錯(cuò)誤。這次深入排查問題之后,才發(fā)現(xiàn) Kafka 異步發(fā)送消息阻塞了 dubbo 線程,從而導(dǎo)致調(diào)用超時(shí)。
一、問題分析
Dubbo 2.6.5,Kafak maven 0.8.0-beta1
服務(wù) A 調(diào)用服務(wù) B,收到如下錯(cuò)誤:
2019-08-30 09:14:52,311 WARN method [%f [DUBBO] Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-xxxx, Pool Size: 1000 (active: 1000, core: 1000, max: 1000, largest: 1000), Task: 6491 (completed: 5491), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://xxxx!, dubbo version: 2.6.0, current host: 127.0.0.1可以看到當(dāng)前 dubbo 線程池已經(jīng)滿載運(yùn)行,不能再接受新的調(diào)用。正常情況下 dubbo 線程可以很快完成任務(wù),然后歸還到線程池中。由于線程執(zhí)行的任務(wù)發(fā)生阻塞,消費(fèi)者端調(diào)用超時(shí)。而服務(wù)提供者端由于已有線程被阻塞,線程池必須不斷創(chuàng)建新線程處理任務(wù),直到線程數(shù)量達(dá)到最大數(shù)量,系統(tǒng)返回 Thread pool is EXHAUSTED。
線程任務(wù)長(zhǎng)時(shí)間被阻塞可能原因有:
- 頻繁的 fullgc,導(dǎo)致系統(tǒng)暫停。
- 調(diào)用某些阻塞 API,如 socket 連接未設(shè)置超時(shí)時(shí)間導(dǎo)致阻塞。
- 系統(tǒng)內(nèi)部死鎖
通過分析系統(tǒng)堆棧 dump 情況,果然發(fā)現(xiàn)所有 dubbo 線程都處于 WATTING 狀態(tài)。
下圖為應(yīng)用堆棧 dump 日志:
從堆棧日志可以看到 dubbo 線程最后阻塞在 LinkedBlockingQueue#put ,而該阻塞發(fā)生在 Kafka 發(fā)送消息方法內(nèi)。
這里服務(wù) B 需要使用 Kafka 發(fā)送監(jiān)控消息,為了消息發(fā)送不影響主業(yè)務(wù),這里使用 Kafka 異步發(fā)送消息。由于 Kafka 服務(wù)端最近更換了對(duì)外的端口,而服務(wù) B Kafka 配置未及時(shí)變更。最后服務(wù) B 修改配置,服務(wù)重新啟動(dòng),該問題得以解決。
二、Kafka 異步模式
下面分析 Kafka 異步發(fā)送消息阻塞的實(shí)際原因。
0.8.0 Kafka 默認(rèn)使用同步模式發(fā)送消息,異步發(fā)送消息需要設(shè)置producer.type=async屬性。同步模式需要等待 Kafka 將消息發(fā)送到消息隊(duì)列,這個(gè)過程當(dāng)然會(huì)阻塞主線程。而異步模式最大的優(yōu)點(diǎn)在于無(wú)需要等待 Kafka 這個(gè)發(fā)送過程。
原本認(rèn)為這里的異步是使用子線程去運(yùn)行任務(wù),但是 Kafka 異步模式并非這樣。查看 Kafka 官方文檔producer,可以看到對(duì)異步模式描述。
Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer has an asynchronous mode that accumulates data in memory and sends out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 100 messages or 5 seconds). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. Since this buffering happens in the client it obviously reduces the durability as any data buffered in memory and not yet sent will be lost in the event of a producer crash.
從上我們可以看到,Kafka 異步模式將會(huì)把多條消息打包一塊批量發(fā)送到服務(wù)端。這種模式將會(huì)先把消息放到內(nèi)存隊(duì)列中,直到消息到達(dá)一定數(shù)量(默認(rèn)為 200)或者等待時(shí)間超限(默認(rèn)為 5000ms)。
這么做最大好處在于提高消息發(fā)送的吞吐量,減少網(wǎng)絡(luò) I/O。當(dāng)然這么做也存在明顯劣勢(shì),如果生產(chǎn)者宕機(jī),在內(nèi)存中還未發(fā)送消息可能就會(huì)丟失。
下面從 kafka 源碼分析這個(gè)阻塞過程。
三、Kafka 源碼解析
Kafka 消息發(fā)送端采用如下配置:
Properties props = new Properties();props.put("metadata.broker.list", "localhost:9092");// 選擇異步發(fā)送props.put("producer.type", "async");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("queue.buffering.max.messages","1");props.put("batch.num.messages","1");Producer<Integer, String> producer= new Producer(new ProducerConfig(props));producer.send(new KeyedMessage("test", "hello world"));這里設(shè)置 producer.type=async,從而使 Kafka 異步發(fā)送消息。
send 方法源碼如下:
ps: 這個(gè)版本 Kafka 源碼采用 Scala 編寫,不過源碼還是比較簡(jiǎn)單,比較容易閱讀。
def send(messages: KeyedMessage[K,V]*) {if (hasShutdown.get)throw new ProducerClosedExceptionrecordStats(messages)sync match {case true => eventHandler.handle(messages)// 由于 producer.type=async 異步發(fā)送case false => asyncSend(messages)}}由于我們上面設(shè)置 producer.type=async,這里將會(huì)使用 asyncSend 異步發(fā)送模式。
asyncSend 源碼如下:
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {for (message <- messages) {val added = config.queueEnqueueTimeoutMs match {case 0 =>queue.offer(message)case _ =>try {config.queueEnqueueTimeoutMs < 0 match {case true =>queue.put(message)truecase _ =>queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)}}catch {case e: InterruptedException =>false}}if(!added) {producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)}else {trace("Added to send queue an event: " + message.toString)trace("Remaining queue size: " + queue.remainingCapacity)}}}asyncSend 將會(huì)把消息加入到 LinkedBlockingQueue 阻塞隊(duì)列中。這里根據(jù) config.queueEnqueueTimeoutMs參數(shù)使用不同方法。
當(dāng) config.queueEnqueueTimeoutMs=0,將會(huì)調(diào)用 LinkedBlockingQueue#offer,如果該隊(duì)列未滿,將會(huì)把元素插入隊(duì)列隊(duì)尾。如果隊(duì)列未滿,直接返回 false。所以如果此時(shí)隊(duì)列已滿,消息不再會(huì)加入隊(duì)列中,然后 asyncSend 將會(huì)拋出 QueueFullException 異常。
當(dāng) config.queueEnqueueTimeoutMs < 0,將會(huì)調(diào)用 LinkedBlockingQueue#put 加入元素,如果該隊(duì)列已滿,該方法將會(huì)一直被阻塞直到隊(duì)列存在可用空間。
當(dāng) config.queueEnqueueTimeoutMs > 0,將會(huì)調(diào)用 LinkedBlockingQueue#offer,這里與上面不同之處在于設(shè)置超時(shí)時(shí)間,如果隊(duì)列已滿將會(huì)阻塞知道超時(shí)。
config.queueEnqueueTimeoutMs參數(shù)通過 queue.enqueue.timeout.ms 配置生效,默認(rèn)為 -1。默認(rèn)情況下 LinkedBlockingQueue 最大數(shù)量為 10000,可以通過設(shè)置 queue.buffering.max.messages 改變隊(duì)列最大值。
消息放到隊(duì)列中后,Kafka 將會(huì)使用一個(gè)異步線程不斷從隊(duì)列中獲取消息,批量發(fā)送消息。
異步處理消息代碼如下:
private def processEvents() {var lastSend = SystemTime.millisecondsvar events = new ArrayBuffer[KeyedMessage[K,V]]var full: Boolean = false// drain the queue until you get a shutdown commandStream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS)).takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {currentQueueItem =>val elapsed = (SystemTime.milliseconds - lastSend)// check if the queue time is reached. This happens when the poll method above returns after a timeout and// returns a null objectval expired = currentQueueItem == nullif(currentQueueItem != null) {trace("Dequeued item for topic %s, partition key: %s, data: %s".format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))events += currentQueueItem}// check if the batch size is reachedfull = events.size >= batchSizeif(full || expired) {if(expired)debug(elapsed + " ms elapsed. Queue time reached. Sending..")if(full)debug("Batch full. Sending..")// if either queue time has reached or batch size has reached, dispatch to event handlertryToHandle(events)lastSend = SystemTime.millisecondsevents = new ArrayBuffer[KeyedMessage[K,V]]}}// send the last batch of eventstryToHandle(events)if(queue.size > 0)throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue".format(queue.size))}這里異步線程將會(huì)不斷從隊(duì)列中獲取任務(wù),一旦條件滿足,就會(huì)批量發(fā)送任務(wù)。該條件為:
- 批量消息數(shù)量達(dá)到 200,可以設(shè)置
batch.num.messages參數(shù)改變配置。 - 等待時(shí)間到達(dá)最大的超時(shí)時(shí)間,默認(rèn)為 5000ms,可以設(shè)置
queue.buffering.max.ms改變改配置。
四、問題解決辦法
上面問題雖然通過更換 Kafka 正確地址解決,但是為了預(yù)防下次該問題再發(fā)生,可以采用如下方案:
- 改變
config.queueEnqueueTimeoutMs默認(rèn)配置,像這種系統(tǒng)監(jiān)控日志允許丟失,所以可以設(shè)置config.queueEnqueueTimeoutMs=0。 - 升級(jí) Kafka 版本,最新版本 Kafka 使用 Java 重寫發(fā)送端邏輯,不再使用阻塞隊(duì)列存儲(chǔ)消息。
本文首發(fā)于:studyidea.cn/kafka…
歡迎關(guān)注我的公眾號(hào):程序通事,獲得日常干貨推送。如果您對(duì)我的專題內(nèi)容感興趣,也可以關(guān)注我的博客:studyidea.cn
總結(jié)
以上是生活随笔為你收集整理的kafka实现异步发送_Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 验血要多少钱啊?
- 下一篇: php网课资源百度云盘_安全中国PHP网