日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka解惑之Old Producer(1)—— Beginning

發(fā)布時間:2024/4/11 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka解惑之Old Producer(1)—— Beginning 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關(guān)注筆者的微信公眾號:朱小廝的博客。

歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-beginning/


眾所周知,目前Kafka的最新版本已經(jīng)到達(dá)1.0.0,很多公司運(yùn)行的kafka也大多升級到了0.10.x版本,Kafka的Producer客戶端早已不再使用0.8.2.x就已基本停止維護(hù)的Scala版本的Producer了,那么我們還有必要了解它么?當(dāng)然很有必要,通過Kafka Old Producer我們可以了解Kafka變遷升級的歷史:舊版的Old Producer模型相對簡單利于初始了解,通過對Old Producer的了解也可以慢慢的發(fā)現(xiàn)隱患的問題,這樣進(jìn)一步可以研究探討解決方法,最后再通過對新版Producer的學(xué)習(xí)來提升對Kafka的認(rèn)知,與此同時也可以讓讀者在遇到相似問題的時候可以借鑒Kafka的優(yōu)化過來來優(yōu)化自己的應(yīng)用。以銅為鑒,可以正衣冠。

在使用Scala版本的Kafka生產(chǎn)者客戶端kafka.javaapi.producer.Producer時,實際上調(diào)用的是kafka.producer.Producer類。

package kafka.javaapi.producer class Producer[K, V](private val underlying : kafka.producer.Producer[K, V]) extends scala.AnyRef {def this(config : kafka.producer.ProducerConfig) = { /* compiled code */ }def send(message : kafka.producer.KeyedMessage[K, V]) : scala.Unit = { /* compiled code */ }def send(messages : java.util.List[kafka.producer.KeyedMessage[K, V]]) : scala.Unit = { /* compiled code */ }def close : scala.Unit = { /* compiled code */ } }

包括kafka-console-producer.sh的腳本(常用來測試發(fā)送消息之用)中,對于0.8.2.x版本如果不指定“-- new-producer”參數(shù);或者對于.0.0版本如果指定“-- old-producer”參數(shù)的話,實際上內(nèi)部調(diào)用的都是kafka.producer.Producer這個類。

對于kafka-console-producer.sh腳本的內(nèi)容如下:

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M" fi exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

我們看到實際上kafka-console-producer.sh的內(nèi)容就是運(yùn)行kafka.tools.ConsoleProducer而已,可以看到main函數(shù)代碼塊中的config.useOldProducer,這個筆者看的是1.0.0版本的代碼,而0.8.2.2版本中的ConsoleProducer對應(yīng)的是config.useNewProducer,稍有不同而已,不過如果都指定使用舊版的Scala的Producer,那么都是指kafka.producer.OldProducer。

object ConsoleProducer {def main(args: Array[String]) {try {val config = new ProducerConfig(args)val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]reader.init(System.in, getReaderProps(config))val producer =if(config.useOldProducer) {new OldProducer(getOldProducerProps(config))} else {new NewShinyProducer(getNewProducerProps(config))}

進(jìn)一步剖析,kafka.producer.OldProducer的內(nèi)部構(gòu)造很簡單,關(guān)鍵代碼如下:

class OldProducer(producerProps: Properties) extends BaseProducer {// default to byte array partitionerif (producerProps.getProperty("partitioner.class") == null)producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))

可以看到內(nèi)部的producer最終還是實例化的kafka.producer.Producer。最終驗證了開篇所述的舊版的Kafka生產(chǎn)者客戶端即為Kafka.producer.Producer。

新版的Java版的Kafka客戶端是:org.apache.kafka.clients.producer.KafkaProducer,讀者請注意區(qū)分。對于新版的KafkaProducer在以后的文章中會有詳細(xì)介紹。

下面就來深入了解下Kafka.producer.Producer(下面如無特殊說明都將Kafka.producer.Producer此簡稱為Producer)了。當(dāng)實例化Producer的時候,首先要讀取、解析以及校驗配置信息的合法性,根據(jù)配置信息來實例化Producer。Producer的配置項有18個,比如設(shè)置分區(qū)器、消息壓縮方式等,這些都比較好理解,而最主要的配置就是request.required.acks和producer.type這兩個配置。

request.required.acks是用來配置生產(chǎn)端消息確認(rèn)的方式,在0.8.x這個系列的版本之中,可以配置為0,1,-1的值,也可以配置為其他的整數(shù)值,用來控制一條消息經(jīng)由多少個ISR中的副本所在的Broker確認(rèn)之后才向客戶端發(fā)送確認(rèn)信息,這個參數(shù)在之后的版本,比如1.0.0版本中就只能設(shè)置0,1,-1(all)這3(4)種取值,分別表示:

  • 當(dāng)request.required.acks=0時,這意味著producer無需等待來自broker的確認(rèn)而繼續(xù)發(fā)送下一批消息。這種情況下數(shù)據(jù)傳輸效率最高,但是數(shù)據(jù)可靠性確是最低的。
  • 當(dāng)request.required.acks=1(默認(rèn))時,這意味著producer在ISR中的leader已成功收到數(shù)據(jù)并得到確認(rèn)。如果leader宕機(jī)了,則會丟失數(shù)據(jù)。
  • 當(dāng)request.required.acks=-1時,producer需要等待ISR中的所有follower都確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成,可靠性最高。但是這樣也不能保證數(shù)據(jù)不丟失,比如當(dāng)ISR中只有l(wèi)eader時,這樣就變成了acks=1的情況。為了提高數(shù)據(jù)的可靠性,可以通過min.insync.replicas參數(shù)來輔助作用,當(dāng)同步副本數(shù)不足時,生產(chǎn)者會跑出異常。
  • 有關(guān)kafka的消息可靠性的更深層次的講解可以參考我2017年初的一篇博客:kafka數(shù)據(jù)可靠性深度解讀,這篇博客主要是針對0.8.2.x版本的kafka做深層次的探討,后續(xù)會對1.0.0版本做進(jìn)一步的說明。

    Producer的發(fā)送模式分為同步(sync)和異步(async)兩種情況,這一點(diǎn)可以通過參數(shù)producer.type來配置。同步模式會將消息直接發(fā)往broker中,而異步模式則會將消息存入LinkedBlockingQueue中,然后通過一個ProducerSendThread來專門發(fā)送消息。為了便于說明,筆者這里先對同步模式的情況來做說明,而異步模式只是在同步模式的基礎(chǔ)上做了一些封裝而已。

    class Producer[K,V](val config: ProducerConfig,private val eventHandler: EventHandler[K,V]) // only for unit testingextends Logging {private val hasShutdown = new AtomicBoolean(false)private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)private var sync: Boolean = trueprivate var producerSendThread: ProducerSendThread[K,V] = nullprivate val lock = new Object()config.producerType match {case "sync" =>case "async" =>sync = falseproducerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,queue,eventHandler,config.queueBufferingMaxMs,config.batchNumMessages,config.clientId)producerSendThread.start()}

    在講述Producer的具體行為之前先來看一個發(fā)送方的Demo:

    public class ProducerScalaDemo {public static final String brokerList = "xxx.xxx.xxx.xxx:9092";public static final String topic = "topic-zzh";public static void main(String[] args) {Properties properties = new Properties();properties.put("serializer.class", "kafka.serializer.StringEncoder");properties.put("metadata.broker.list", brokerList);properties.put("producer.type", "sync");properties.put("request.required.acks", "1");Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(properties));String message = "kafka_message-" + new Date().getTime() + " edited by hidden.zhu";KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic,null, message);producer.send(keyedMessage);} }

    我們可以看到再初始化Producer的時候之用了ProducerConfig這一個類型的參數(shù),而在Producer的類定義中還用到了EventHandler這個類型的參數(shù)。在Scala語言中只有一個主構(gòu)造函數(shù),這個主構(gòu)造函數(shù)的參數(shù)列表就是跟在類名后面括號中的各個的參數(shù),如果要重載的話就需要自定義輔助構(gòu)造函數(shù),輔助構(gòu)造函數(shù)必須調(diào)用主構(gòu)造函數(shù)(this方法)。如此上面這個Demo中很顯然的就調(diào)用了輔助構(gòu)造函數(shù)來進(jìn)行實例化,那么我們再來看下其對應(yīng)的輔助構(gòu)造函數(shù):

    def this(config: ProducerConfig) =this(config,new DefaultEventHandler[K,V](config,CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),new ProducerPool(config)))

    這里又引入了兩個新的東西:DefaultEventHandler和ProducerPool,這個DefaultEventHandler繼承了EventHandler這個類,這個是消息發(fā)送的關(guān)鍵。而ProducerPool內(nèi)部是一個HashMap,其中的key是broker的id,而value就是每個broker對應(yīng)的SyncProducer,這個SyncProducer就是真正的消息發(fā)送者。

    歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-beginning/


    歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關(guān)注筆者的微信公眾號:朱小廝的博客。


    總結(jié)

    以上是生活随笔為你收集整理的Kafka解惑之Old Producer(1)—— Beginning的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。