Kafka解惑之Old Producer(4)——Case Analysis
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-case-analysis/
上接:
在前面三篇文章中詳細了解了Old Producer的內容,本文主要通過一個實際應用案例來加深各位對Old Producer的理解。
問題描述:線上由多臺Kafka Broker組成的集群(Producer的metadata.broker.list參數設置的是所有broker的地址+端口號的列表),版本號為0.8.2.x,當一臺Kafka Broker的硬盤發生故障導致系統崩潰,由于Kafka的HA的作用線上業務無明顯異常,發送方和消費方的流量也與之前持穩,但是后面監測到每隔10分鐘左右就有少量的消息發送的時延很大,而且有ERROR告警報出。
2018-01-30 00:53:20 -[ERROR] - [fetching topic metadata for topics [Set(hidden-topic)] from broker [ArrayBuffer(id:0,host:xxx.xxx.xxx.xxx,port:9092)] failed] - [kafka.utils.Logging$$anonfun$swallowError$1:106] kafka.common.KafkaException: fetching topic metadata for topics [Set(hidden-topic)] from broker [ArrayBuffer(id:0,host:xxx.xxx.xxx.xxx,port:9092)] failedat kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)at kafka.utils.Utils$.swallow(Utils.scala:172)at kafka.utils.Logging$class.swallowError(Logging.scala:106)at kafka.utils.Utils$.swallowError(Utils.scala:45)at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)at kafka.producer.Producer.send(Producer.scala:77)at kafka.javaapi.producer.Producer.send(Producer.scala:33)通過上面的異常棧我們可以發現在獲取元數據的時候(kafka.client.ClientUtils$.fetchTopicMetadata)發生了異常,其實如果你對Kafka的配置參數足夠了解的話,看到10分鐘這個數值就可以聯想到600*1000的某個參數,也就是topic.metadata.refresh.interval.ms。在DefaultEventHandler的handle()方法中,在調用dispatchSerializedData()方法預處理并發送消息之前就會有下面的一個if判斷語言用來判斷當前是否需要更新元數據:
while (remainingRetries > 0 && outstandingProduceRequests.nonEmpty) {topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)if (topicMetadataRefreshInterval >= 0 &&Time.SYSTEM.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))sendPartitionPerTopicCache.clear()topicMetadataToRefresh.clearlastTopicMetadataRefreshTime = Time.SYSTEM.milliseconds}outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)上面代碼中的topicMetadataRefreshInterval指的就是topic.metadata.refresh.interval.ms參數。如果topic.metadata.refresh.interval.ms這個參數設置為0,那么就意味著每次發送消息之前都需要拉取并更新元數據信息,更新元數據信息之后還要更新ProducerPool的內容,包括重建SyncProducer,那么這一番操作必然會有一定的延遲,當然這個延遲沒有本文中所述問題中的延遲那么大。如果topic.metadata.refresh.interval.ms這個參數設置為負數,那么這個if條件語句就不能成立,也就是不會有定時更新元數據的操作,只有在獲取元數據信息失敗是才會請求完整的元數據信息。
本文中所述問題中采用的topic.metadata.refresh.interval.ms參數設置的是默認值大小,那么問題中還有一個細節:只有少量消息發送超時。為了進一步的一探究竟,我們來繼續深究。當定時更新元數據信息條件滿足時,就會調用brokerPartitionInfo.updateInfo的方法,更進取之后,實際上內部調用的是:ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)來請求元數據信息,并獲得TopicMetadataResponse來做更新。
TopicMetadataResponse中包含有broker的id、host、port,還有topic、topic中的partition、partition所對應的leader、AR和ISR等等,有興趣的同學可以進一步翻閱kafka.api.TopicMetadataResponse這個類,主體代碼只有30行左右,只要學一點Scala的構造函數相關的只是就能看懂。
我們回過頭來再來進一步分析ClientUtils.fetchTopicMetadata這個方法,詳細代碼如下:
def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {var fetchMetaDataSucceeded: Boolean = falsevar i: Int = 0val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)var topicMetadataResponse: TopicMetadataResponse = nullvar t: Throwable = null// shuffle the list of brokers before sending metadata requests so that most requests don't get routed to the// same brokerval shuffledBrokers = Random.shuffle(brokers)while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))try {topicMetadataResponse = producer.send(topicMetadataRequest)fetchMetaDataSucceeded = true}catch {case e: Throwable =>warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed".format(correlationId, topics, shuffledBrokers(i).toString), e)t = e} finally {i = i + 1producer.close()}}if(!fetchMetaDataSucceeded) {throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)} else {debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))}topicMetadataResponse }fetchTopicMetadata方法參數列表中brokers代表metadata.broker.list所配置的地址列表。可以看到方法中首先建立TopicMetadataRequest的請求,然后從brokers中隨機挑選(做一個shuffle,然后從列表中的第一個開始取,也就是相當于隨機)一個broker建立SyncProducer并發送TopicMetadataRequest請求,問題的關鍵就在這個隨機挑選一個broker之上,如果正好隨機到的是那臺磁盤損毀而崩潰的機器,那么這個請求必定要等到設定的超時時間之后才能捕獲異常:[ERROR] - [fetching topic metadata for topics [Set(hidden-topic)] from broker [ArrayBuffer(id:0,host:xxx.xxx.xxx.xxx,port:9092)] failed] ,進而再找到下一個broker重新發送TopicMetadataRequest請求。
上面提到了超時時間,這個超時時間是通過request.timeout.ms參數設定的,默認值為10000,也就是10s。具體指的是kafka.network.BlockingChannel中的channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)這段代碼,參數connectTimeoutMs就是指request.timeout.ms。如果元數據的請求時打到那臺崩潰的broker上的話,那么元數據的請求就要耗時10s以上,待元數據刷新后才能發送消息。這個request.timeout.ms參數才是導致文中開頭有少量消息發送時延很大的原因。為了進一步驗證結論是否正確,筆者將相關的類SyncProducer、BlockingChannel用Java重寫了一遍,并測試出請求的耗時,當訪問一個不存在的ip地址時,從發送請求到異常報出的耗時在10194ms。不過如果訪問一個存在的ip地址時,但是沒有kafka服務的話,從發送請求到返回的耗時只有1248ms,基本上減少了一個數量級,如果讀者在遇到同樣的問題時,不妨上線一臺(隨意一臺能建立TCP連接的機器就好)與崩潰宕機的那臺broker一樣的ip地址的機器,上面無需運行kafka的服務,就能大大的降低發送消息的時延。這樣可以流出更多的時間去定位、修復、重新上線那臺崩潰的broker。
當然如果對時延過敏,還有一些其它的方法可以參考。比如metadata.broker.list配置的是一個類似虛IP(VIP)的話,可以在VIP的下游剔除掉這臺broker,讓VIP過來的請求不會落到崩潰的broker上。或者也可以通過topic.metadata.refresh.interval.ms參數來調節,比如設置為負數就可以免去定時刷新元數據的煩惱,不過在元數據變動的時候很難有效的感知其變化,通過定時重連之類的方法又顯得有點奇葩。當然直接上線一臺服務器,安裝運行kafka服務,且設置這臺kafka服務器的ip為崩潰的那臺服務器的ip地址,不過這番操作比直接拉一臺空機器的時效性要低一些,凡事在于抉擇。
Old Producer拉取元數據信息以及發送消息是在同一個線程中的,這必然會引起局部消息的時延增大。不過在新版的KafkaProducer中,這些問題都已經迎刃而解,具體怎么處理,且看后面的文章分析。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-case-analysis/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的Kafka解惑之Old Producer(4)——Case Analysis的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka解惑之Old Producer
- 下一篇: Kafka解析之topic创建(2)