Kafka解惑之Old Producer(1)—— Beginning
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-beginning/
眾所周知,目前Kafka的最新版本已經到達1.0.0,很多公司運行的kafka也大多升級到了0.10.x版本,Kafka的Producer客戶端早已不再使用0.8.2.x就已基本停止維護的Scala版本的Producer了,那么我們還有必要了解它么?當然很有必要,通過Kafka Old Producer我們可以了解Kafka變遷升級的歷史:舊版的Old Producer模型相對簡單利于初始了解,通過對Old Producer的了解也可以慢慢的發現隱患的問題,這樣進一步可以研究探討解決方法,最后再通過對新版Producer的學習來提升對Kafka的認知,與此同時也可以讓讀者在遇到相似問題的時候可以借鑒Kafka的優化過來來優化自己的應用。以銅為鑒,可以正衣冠。
在使用Scala版本的Kafka生產者客戶端kafka.javaapi.producer.Producer時,實際上調用的是kafka.producer.Producer類。
package kafka.javaapi.producer class Producer[K, V](private val underlying : kafka.producer.Producer[K, V]) extends scala.AnyRef {def this(config : kafka.producer.ProducerConfig) = { /* compiled code */ }def send(message : kafka.producer.KeyedMessage[K, V]) : scala.Unit = { /* compiled code */ }def send(messages : java.util.List[kafka.producer.KeyedMessage[K, V]]) : scala.Unit = { /* compiled code */ }def close : scala.Unit = { /* compiled code */ } }包括kafka-console-producer.sh的腳本(常用來測試發送消息之用)中,對于0.8.2.x版本如果不指定“-- new-producer”參數;或者對于.0.0版本如果指定“-- old-producer”參數的話,實際上內部調用的都是kafka.producer.Producer這個類。
對于kafka-console-producer.sh腳本的內容如下:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M" fi exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"我們看到實際上kafka-console-producer.sh的內容就是運行kafka.tools.ConsoleProducer而已,可以看到main函數代碼塊中的config.useOldProducer,這個筆者看的是1.0.0版本的代碼,而0.8.2.2版本中的ConsoleProducer對應的是config.useNewProducer,稍有不同而已,不過如果都指定使用舊版的Scala的Producer,那么都是指kafka.producer.OldProducer。
object ConsoleProducer {def main(args: Array[String]) {try {val config = new ProducerConfig(args)val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]reader.init(System.in, getReaderProps(config))val producer =if(config.useOldProducer) {new OldProducer(getOldProducerProps(config))} else {new NewShinyProducer(getNewProducerProps(config))}進一步剖析,kafka.producer.OldProducer的內部構造很簡單,關鍵代碼如下:
class OldProducer(producerProps: Properties) extends BaseProducer {// default to byte array partitionerif (producerProps.getProperty("partitioner.class") == null)producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))可以看到內部的producer最終還是實例化的kafka.producer.Producer。最終驗證了開篇所述的舊版的Kafka生產者客戶端即為Kafka.producer.Producer。
新版的Java版的Kafka客戶端是:org.apache.kafka.clients.producer.KafkaProducer,讀者請注意區分。對于新版的KafkaProducer在以后的文章中會有詳細介紹。
下面就來深入了解下Kafka.producer.Producer(下面如無特殊說明都將Kafka.producer.Producer此簡稱為Producer)了。當實例化Producer的時候,首先要讀取、解析以及校驗配置信息的合法性,根據配置信息來實例化Producer。Producer的配置項有18個,比如設置分區器、消息壓縮方式等,這些都比較好理解,而最主要的配置就是request.required.acks和producer.type這兩個配置。
request.required.acks是用來配置生產端消息確認的方式,在0.8.x這個系列的版本之中,可以配置為0,1,-1的值,也可以配置為其他的整數值,用來控制一條消息經由多少個ISR中的副本所在的Broker確認之后才向客戶端發送確認信息,這個參數在之后的版本,比如1.0.0版本中就只能設置0,1,-1(all)這3(4)種取值,分別表示:
有關kafka的消息可靠性的更深層次的講解可以參考我2017年初的一篇博客:kafka數據可靠性深度解讀,這篇博客主要是針對0.8.2.x版本的kafka做深層次的探討,后續會對1.0.0版本做進一步的說明。
Producer的發送模式分為同步(sync)和異步(async)兩種情況,這一點可以通過參數producer.type來配置。同步模式會將消息直接發往broker中,而異步模式則會將消息存入LinkedBlockingQueue中,然后通過一個ProducerSendThread來專門發送消息。為了便于說明,筆者這里先對同步模式的情況來做說明,而異步模式只是在同步模式的基礎上做了一些封裝而已。
class Producer[K,V](val config: ProducerConfig,private val eventHandler: EventHandler[K,V]) // only for unit testingextends Logging {private val hasShutdown = new AtomicBoolean(false)private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)private var sync: Boolean = trueprivate var producerSendThread: ProducerSendThread[K,V] = nullprivate val lock = new Object()config.producerType match {case "sync" =>case "async" =>sync = falseproducerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,queue,eventHandler,config.queueBufferingMaxMs,config.batchNumMessages,config.clientId)producerSendThread.start()}在講述Producer的具體行為之前先來看一個發送方的Demo:
public class ProducerScalaDemo {public static final String brokerList = "xxx.xxx.xxx.xxx:9092";public static final String topic = "topic-zzh";public static void main(String[] args) {Properties properties = new Properties();properties.put("serializer.class", "kafka.serializer.StringEncoder");properties.put("metadata.broker.list", brokerList);properties.put("producer.type", "sync");properties.put("request.required.acks", "1");Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(properties));String message = "kafka_message-" + new Date().getTime() + " edited by hidden.zhu";KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic,null, message);producer.send(keyedMessage);} }我們可以看到再初始化Producer的時候之用了ProducerConfig這一個類型的參數,而在Producer的類定義中還用到了EventHandler這個類型的參數。在Scala語言中只有一個主構造函數,這個主構造函數的參數列表就是跟在類名后面括號中的各個的參數,如果要重載的話就需要自定義輔助構造函數,輔助構造函數必須調用主構造函數(this方法)。如此上面這個Demo中很顯然的就調用了輔助構造函數來進行實例化,那么我們再來看下其對應的輔助構造函數:
def this(config: ProducerConfig) =this(config,new DefaultEventHandler[K,V](config,CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),new ProducerPool(config)))這里又引入了兩個新的東西:DefaultEventHandler和ProducerPool,這個DefaultEventHandler繼承了EventHandler這個類,這個是消息發送的關鍵。而ProducerPool內部是一個HashMap,其中的key是broker的id,而value就是每個broker對應的SyncProducer,這個SyncProducer就是真正的消息發送者。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-beginning/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的Kafka解惑之Old Producer(1)—— Beginning的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux IO磁盘篇整理小记
- 下一篇: Kafka解惑之Old Producer