日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka0.9 java commit_0.9版本kafka优化及常见错误(转载)

發布時間:2024/10/8 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka0.9 java commit_0.9版本kafka优化及常见错误(转载) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Kafka設計的初衷是迅速處理短小的消息,一般10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,我們需要處理更大的消息,比如XML文檔或JSON內容,一個消息差不多有10-100M,這種情況下,Kakfa應該如何處理?

針對這個問題,有以下幾個建議:

最好的方法是不直接傳送這些大的數據。如果有共享存儲,如NAS, HDFS,

S3等,可以把這些大的文件存放到共享存儲,然后使用Kafka來傳送文件的位置信息。

第二個方法是,將大的消息數據切片或切塊,在生產端將數據切片為10K大小,使用分區主鍵確保一個大消息的所有部分會被發送到同一個kafka分區(這樣每一部分的拆分順序得以保留),如此以來,當消費端使用時會將這些部分重新還原為原始的消息。

第三,Kafka的生產端可以壓縮消息,如果原始消息是XML,當通過壓縮之后,消息可能會變得不那么大。在生產端的配置參數中使用compression.codec和commpressed.topics可以開啟壓縮功能,壓縮算法可以使用GZip或Snappy。

1. Broker配置

message.max.bytes

默認:1000000。broker能接收消息的最大字節數,這個值應該比消費端的fetch.message.max.bytes更小才對,否則broker就會因為消費端無法使用這個消息而掛起。

log.segment.bytes

默認:

1GB。kafka數據文件的大小,確保這個數值大于一個消息的長度。一般說來使用默認值即可(一般一個消息很難大于1G,因為這是一個消息系統,而不是文件系統)。

replica.fetch.max.bytes

默認:

1MB。broker可復制的消息的最大字節數。這個值應該比message.max.bytes大,否則broker會接收此消息,但無法將此消息復制出去,從而造成數據丟失。

2. Producer配置

producer方面沒有什么太多需要優化的,最需要注意的一點是:不要使用0.8版本的Producer。

0.8版本的Producer在面對大量數據的寫入時,會導致producer端使用的直接內存無法釋放,最終導致應用被操作系統中斷掉。

3. Consumer配置

fetch.message.max.bytes

默認 1MB。消費者能讀取的最大消息。這個值應該大于或等于message.max.bytes。

所以,如果你一定要選擇kafka來傳送大的消息,還有些事項需要考慮。要傳送大的消息,不是當出現問題之后再來考慮如何解決,而是在一開始設計的時候,就要考慮到大消息對集群和主題的影響。

性能

根據前面提到的性能測試,kafka在消息為10K時吞吐量達到最大,更大的消息會降低吞吐量,在設計集群的容量時,尤其要考慮這點。

可用的內存和分區數

Brokers會為每個分區分配replica.fetch.max.bytes參數指定的內存空間,假設replica.fetch.max.bytes=1M,且有1000個分區,則需要差不多1G的內存,確保

分區數最大的消息不會超過服務器的內存,否則會報OOM錯誤。同樣地,消費端的fetch.message.max.bytes指定了最大消息需要的內存空間,同樣,分區數最大需要內存空間

不能超過服務器的內存。所以,如果你有大的消息要傳送,則在內存一定的情況下,只能使用較少的分區數或者使用更大內存的服務器。

垃圾回收

到現在為止,我在kafka的使用中還沒發現過此問題,但這應該是一個需要考慮的潛在問題。更大的消息會讓GC的時間更長(因為broker需要分配更大的塊),隨時關注GC的日志和服務器的日志信息。如果長時間的GC導致kafka丟失了zookeeper的會話,則需要配置zookeeper.session.timeout.ms參數為更大的超時時間。

4. 常見錯誤

4.1 kafka.common.MessageSizeTooLargeException

這個異常的原因比較明顯,單個消息太大了,到官網找找配置就可以解決。

如果是生產者報錯,修改Kafka

Broker的配置,在server.properties中配置message.max.bytes,默認是1M(約)。

如果是消費者報錯,修改消費者中增加fetch.message.max.bytes的配置,這個配置的值要大于Broker的message.max.bytes配置。

修改了Broker的message.max.bytes,同時需要修改replica.fetch.max.bytes,并且replica.fetch.max.bytes要大于message.max.bytes。

4.2 Ierator is in failed state

這真的是一個神一般存在的錯誤提示。Consumer消費時出現Iterator is in failed

state的錯誤提示,錯誤量很多,并且consumer不再消費kafka了,造成消息堆積。具體報錯如下:

[ERROR]06-17 10:19:09 com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.

java.lang.IllegalStateException: Iterator is in failed state

at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)

at com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.

at com.test.monitorlogservice.processor.LogProcessor.access$200(LogProcessor.

at com.test.monitorlogservice.processor.LogProcessor$1.run(LogProcessor.

at

...

但這個錯誤并不是真正的錯誤,是因為MessageSizeTooLargeException導致的,發生MessageSizeTooLargeException異常會導致Iterator

is in failed

state錯誤的發生,但是MessageSizeTooLargeException只會打印一次,而Iterator is in

failed state錯誤會隨著讀取方法的調用不停的打,完全能將錯誤分析方向帶偏了。具體如下:

kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic LogProd1 partition 27 at fetch offset 114. Increase the fetch size, or decrease the maximum message size the br

oker will allow.

at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)

at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)

at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)

at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)

at com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.

at com.test.monitorlogservice.processor.LogProcessor.access$200(LogProcessor.

at com.test.monitorlogservice.processor.LogProcessor$1.run(LogProcessor.

at

[ERROR]06-17 10:19:09 com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.

java.lang.IllegalStateException: Iterator is in failed state

at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)

at com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.

at com.test.monitorlogservice.processor.LogProcessor.access$200(LogProcessor.

at com.test.monitorlogservice.processor.LogProcessor$1.run(LogProcessor.

at

[ERROR]06-17 10:19:09 com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.

java.lang.IllegalStateException: Iterator is in failed state

at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)

at com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.

at com.test.monitorlogservice.processor.LogProcessor.access$200(LogProcessor.

at com.test.monitorlogservice.processor.LogProcessor$1.run(LogProcessor.

at

[ERROR]06-17 10:19:09 com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.

java.lang.IllegalStateException: Iterator is in failed state

at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)

at com.test.monitorlogservice.processor.LogProcessor.getNext(LogProcessor.

at com.test.monitorlogservice.processor.LogProcessor.access$200(LogProcessor.

at com.test.monitorlogservice.processor.LogProcessor$1.run(LogProcessor.

at

解決MessageSizeTooLargeException即可。

4.3 kafka.consumer.ConsumerTimeoutException

這個錯誤提示也比較直白,消費者消費數據時超時了。

默認情況下是阻塞式消費數據,不會報這個錯誤。

如果consumer設置consumer.timeout.ms,則消費者消費數據時超時就會報錯。

消費者消費數據超時的一種情況就是所有數據均被消費完了。

4.4 java.io.IOException: Broken pipe

4.5 java.io.IOException: Connection reset by peer

4.6

org.apache.kafka.clients.consumer.CommitFailedException: Commit

cannot be completed due to group rebalance

這個錯誤提示比較直白,意思是消費者消費了數據,但在規定時間內沒有commit,所以kafka認為這個consumer掛掉了,這時對consumer的group進行再平衡。

解決方法有三種:

增加消費超時時間。消費超時時間通過heartbeat.interval.ms設置,heartbeat.interval.ms的大小不能超過session.timeout.ms,session.timeout.ms必須在[group.min.session.timeout.ms,

group.max.session.timeout.ms]范圍內。

減少消息處理時間;由后端處理決定。

減少一次消費的消息量。max.partition.fetch.bytes決定容量,max.poll.records決定數量。max.partition.fetch.bytes規定了一個partition一次pull獲取的獲取的數據大小。max.poll.records規定一次pull獲取的消息數量。

轉載地址:http://blog.csdn.net/weitry/article/details/53009134

總結

以上是生活随笔為你收集整理的kafka0.9 java commit_0.9版本kafka优化及常见错误(转载)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。