日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka解惑之Old Producer(3)——Async Analysis

發(fā)布時間:2024/4/11 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka解惑之Old Producer(3)——Async Analysis 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。

歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-async-analysis/


上接:

  • Kafka解惑之Old Producer(1)—— Beginning
  • Kafka解惑之Old Producer(2)——Sync Analysis
  • 講述完了Sync模式下的結構脈絡,下面就來聊一聊Async的,Async會將客戶端所要發(fā)送的消息暫存在LinkedBlockingQueue中,然后通過特制的ProducerSendThread來根據(jù)條件發(fā)送。這個LinkedBlockingQueue的長度大小是通過queue.buffering.max.messages這個參數(shù)設置的,默認值為10000。啟用異步模式時,producer緩存隊列里最大緩存的消息數(shù)量,如果超過這個值,producer就會阻塞或者丟掉消息。

    在講述Old Producer的開篇,我們展示過sync和async的代碼,不妨這里在贅述一下:

    config.producerType match {case "sync" =>case "async" =>sync = falseproducerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,queue,eventHandler,config.queueBufferingMaxMs,config.batchNumMessages,config.clientId)producerSendThread.start()}

    這里可以看到sync的情況什么都不需要做,而async的情況就需要開啟ProducerSendThread, 而在ProducerSendThread構造函數(shù)列表中的queue就是指客戶端暫存消息的LinkedBlockingQueue。
    注意ProducerSendThread構造函數(shù)列表中的config.queueBufferingMaxMs和config.batchNumMessages兩個參數(shù)分表對應Producer端的配置queue.buffering.max.ms和batch.num.messages,默認值分別為5000和200,這兩個參數(shù)后面會有用途。

    注意在Scala語言中是沒有break語句的,這點與Java不同,Scala的相當于每個case語句末尾都加上了一個break語句,所以讀到相關源碼的同學不要誤以為是Java中語法的那種情況。

    在async模式下,消息發(fā)送不是直接調用sync模式下的DefaultEventHandler的handle()方法,而是調用kafka.producer.Producer的asyncSend方法如下(只展示主要內容):

    private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {for (message <- messages) {val added = config.queueEnqueueTimeoutMs match {case 0 =>queue.offer(message)case _ =>try {if (config.queueEnqueueTimeoutMs < 0) {queue.put(message)true} else {queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)}}catch {case _: InterruptedException =>false}}}}

    可以看到消息入隊(存入LinkedBlockingQueue)中受到queue.enqueue.timeout.ms參數(shù)的影響,這個參數(shù)表示當LinkedBlockingQueue中消息達到上限queue.buffering.max.messages個數(shù)時所需要等待的時間。如果queue.enqueue.timeout.ms參數(shù)設置為0,表示不需要等待直接丟棄消息;如果設置為-1(默認值)則隊列滿時會阻塞等待。

    消息存入LinkedBlockingQueue中就需要一個異步的線程ProducerSendThread來執(zhí)行發(fā)送消息的操作,這個操作主要是通過kafka.producer.async.ProducerSendThread類中的processEvents方法來執(zhí)行。processEvents方法的具體細節(jié)如下:

    private def processEvents() {var lastSend = Time.SYSTEM.millisecondsvar events = new ArrayBuffer[KeyedMessage[K,V]]var full: Boolean = false// drain the queue until you get a shutdown commandIterator.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - Time.SYSTEM.milliseconds), TimeUnit.MILLISECONDS)).takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {currentQueueItem =>// check if the queue time is reached. This happens when the poll method above returns after a timeout and// returns a null objectval expired = currentQueueItem == nullif(currentQueueItem != null) {events += currentQueueItem}// check if the batch size is reachedfull = events.size >= batchSizeif(full || expired) {// if either queue time has reached or batch size has reached, dispatch to event handlertryToHandle(events)lastSend = Time.SYSTEM.millisecondsevents = new ArrayBuffer[KeyedMessage[K,V]]}}// send the last batch of eventstryToHandle(events)if(queue.size > 0)throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue".format(queue.size)) }

    一長串Scala源碼會不會看的一頭霧水?這里來一步一步的分析一下:

  • 首先持續(xù)的拉取queue(LinkedBlockingQueue)中的消息,注意這里用的是poll(long timeout, TimeUnit unit)方法,這個表示這里會等待一段時間之后再拉取隊列中的消息,這個等待的時間由queueTime也就是queue.buffering.max.ms這個參數(shù)設置。比如我們設置成1000時(默認為5000),它會大致緩存1s的數(shù)據(jù)再一次發(fā)送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。
  • 如果拉取到了消息,那么就存儲緩存events(ArrayBuffer[KeyedMessage[K,V]])中,等到events中的消息大于batchSize大小,也就是batch.num.messages個數(shù)時再調用tryToHandle(events)來處理消息。
  • tryToHandle(events)就是調用DefaultEventHandler類中的handle()方法,接下去的工作就和Sync模式的相同。
  • 如果在等待的時間內沒有獲取到相應的消息,那么無需等待 events.size >= batchSize條件的滿足就可以發(fā)送消息。
  • 在講述Sync模式的時候筆者畫過一份結構圖,這里也來畫一幅Async結構圖來收尾,與Sync模式的類似,具體如下:

    歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-async-analysis/


    歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


    總結

    以上是生活随笔為你收集整理的Kafka解惑之Old Producer(3)——Async Analysis的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。