Kafka解惑之Old Producer(3)——Async Analysis
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-async-analysis/
上接:
講述完了Sync模式下的結構脈絡,下面就來聊一聊Async的,Async會將客戶端所要發(fā)送的消息暫存在LinkedBlockingQueue中,然后通過特制的ProducerSendThread來根據(jù)條件發(fā)送。這個LinkedBlockingQueue的長度大小是通過queue.buffering.max.messages這個參數(shù)設置的,默認值為10000。啟用異步模式時,producer緩存隊列里最大緩存的消息數(shù)量,如果超過這個值,producer就會阻塞或者丟掉消息。
在講述Old Producer的開篇,我們展示過sync和async的代碼,不妨這里在贅述一下:
config.producerType match {case "sync" =>case "async" =>sync = falseproducerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,queue,eventHandler,config.queueBufferingMaxMs,config.batchNumMessages,config.clientId)producerSendThread.start()}這里可以看到sync的情況什么都不需要做,而async的情況就需要開啟ProducerSendThread, 而在ProducerSendThread構造函數(shù)列表中的queue就是指客戶端暫存消息的LinkedBlockingQueue。
注意ProducerSendThread構造函數(shù)列表中的config.queueBufferingMaxMs和config.batchNumMessages兩個參數(shù)分表對應Producer端的配置queue.buffering.max.ms和batch.num.messages,默認值分別為5000和200,這兩個參數(shù)后面會有用途。
注意在Scala語言中是沒有break語句的,這點與Java不同,Scala的相當于每個case語句末尾都加上了一個break語句,所以讀到相關源碼的同學不要誤以為是Java中語法的那種情況。
在async模式下,消息發(fā)送不是直接調用sync模式下的DefaultEventHandler的handle()方法,而是調用kafka.producer.Producer的asyncSend方法如下(只展示主要內容):
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {for (message <- messages) {val added = config.queueEnqueueTimeoutMs match {case 0 =>queue.offer(message)case _ =>try {if (config.queueEnqueueTimeoutMs < 0) {queue.put(message)true} else {queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)}}catch {case _: InterruptedException =>false}}}}可以看到消息入隊(存入LinkedBlockingQueue)中受到queue.enqueue.timeout.ms參數(shù)的影響,這個參數(shù)表示當LinkedBlockingQueue中消息達到上限queue.buffering.max.messages個數(shù)時所需要等待的時間。如果queue.enqueue.timeout.ms參數(shù)設置為0,表示不需要等待直接丟棄消息;如果設置為-1(默認值)則隊列滿時會阻塞等待。
消息存入LinkedBlockingQueue中就需要一個異步的線程ProducerSendThread來執(zhí)行發(fā)送消息的操作,這個操作主要是通過kafka.producer.async.ProducerSendThread類中的processEvents方法來執(zhí)行。processEvents方法的具體細節(jié)如下:
private def processEvents() {var lastSend = Time.SYSTEM.millisecondsvar events = new ArrayBuffer[KeyedMessage[K,V]]var full: Boolean = false// drain the queue until you get a shutdown commandIterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - Time.SYSTEM.milliseconds), TimeUnit.MILLISECONDS)).takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {currentQueueItem =>// check if the queue time is reached. This happens when the poll method above returns after a timeout and// returns a null objectval expired = currentQueueItem == nullif(currentQueueItem != null) {events += currentQueueItem}// check if the batch size is reachedfull = events.size >= batchSizeif(full || expired) {// if either queue time has reached or batch size has reached, dispatch to event handlertryToHandle(events)lastSend = Time.SYSTEM.millisecondsevents = new ArrayBuffer[KeyedMessage[K,V]]}}// send the last batch of eventstryToHandle(events)if(queue.size > 0)throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue".format(queue.size)) }一長串Scala源碼會不會看的一頭霧水?這里來一步一步的分析一下:
在講述Sync模式的時候筆者畫過一份結構圖,這里也來畫一幅Async結構圖來收尾,與Sync模式的類似,具體如下:
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-async-analysis/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的Kafka解惑之Old Producer(3)——Async Analysis的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka解惑之Old Producer
- 下一篇: Kafka解惑之Old Producer